Skip to content

Commit

Permalink
fully migrate to iostream
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Feb 26, 2024
1 parent e6e9f50 commit 640a78b
Show file tree
Hide file tree
Showing 21 changed files with 106 additions and 265 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "vendor/iostream"]
path = vendor/iostream
url = https://github.com/c-cube/ocaml-iostream
1 change: 1 addition & 0 deletions batrpc.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ depends: [
"atomic"
"logs"
"camlzip"
"iostream" {>= "0.2"}
"hmap"
"pbrt" {>= "3.0"}
"pbrt_yojson" {>= "3.0"}
Expand Down
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
logs
camlzip
;(ocaml-protoc (and (>= 3.0) (< 4.0)))
(iostream (>= 0.2))
hmap
(pbrt (>= 3.0))
(pbrt_yojson (>= 3.0))
Expand Down
2 changes: 1 addition & 1 deletion src/core/basic_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ let create :
?active:Switch.t ->
?encoding:Encoding.t ->
timer:Timer.t ->
ic:#Io.In.bufferized_t ->
ic:#Io.In.t ->
oc:#Io.Out.t ->
unit ->
t =
Expand Down
14 changes: 6 additions & 8 deletions src/core/client_state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,8 @@ let mk_unary_handler (self : t) ?buf_pool ~timer ~oc ~encoding ~timeout_s rpc :

let default_timeout_s_ : float = 30.

let call (self : t) ?buf_pool ~timer ~(oc : #Io.Out.bufferized_t Lock.t)
~encoding ?(headers = []) ?(timeout_s = default_timeout_s_) rpc req :
_ Fut.t =
let call (self : t) ?buf_pool ~timer ~(oc : #Io.Out.t Lock.t) ~encoding
?(headers = []) ?(timeout_s = default_timeout_s_) rpc req : _ Fut.t =
let initial_handler =
mk_unary_handler self ?buf_pool ~timer ~oc ~encoding ~timeout_s rpc
in
Expand All @@ -245,8 +244,8 @@ let call (self : t) ?buf_pool ~timer ~(oc : #Io.Out.bufferized_t Lock.t)
let ctx = { Handler.headers; hmap = Hmap.empty } in
handler (ctx, req) |> Fut.map ~f:snd

let call_client_stream (self : t) ?buf_pool ~timer
~(oc : #Io.Out.bufferized_t Lock.t) ~encoding ?(headers = []) ?timeout_s
let call_client_stream (self : t) ?buf_pool ~timer ~(oc : #Io.Out.t Lock.t)
~encoding ?(headers = []) ?timeout_s
(rpc :
( 'req,
Service.Value_mode.stream,
Expand Down Expand Up @@ -317,9 +316,8 @@ let call_client_stream (self : t) ?buf_pool ~timer
let fut = fut |> Fut.map ~f:snd in
stream, fut

let call_server_stream (self : t) ?buf_pool ~timer
~(oc : #Io.Out.bufferized_t Lock.t) ~encoding ?(headers = [])
?(timeout_s = default_timeout_s_)
let call_server_stream (self : t) ?buf_pool ~timer ~(oc : #Io.Out.t Lock.t)
~encoding ?(headers = []) ?(timeout_s = default_timeout_s_)
(rpc :
( 'req,
Service.Value_mode.unary,
Expand Down
14 changes: 7 additions & 7 deletions src/core/client_state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ val handle_response :
t ->
buf_pool:Buf_pool.t ->
meta:Meta.meta ->
ic:#Io.In.bufferized_t ->
ic:#Io.In.t ->
encoding:Encoding.t ->
unit ->
unit
Expand All @@ -25,7 +25,7 @@ val handle_error :
t ->
buf_pool:Buf_pool.t ->
meta:Meta.meta ->
ic:#Io.In.bufferized_t ->
ic:#Io.In.t ->
encoding:Encoding.t ->
unit ->
unit
Expand All @@ -34,7 +34,7 @@ val handle_stream_item :
t ->
buf_pool:Buf_pool.t ->
meta:Meta.meta ->
ic:#Io.In.bufferized_t ->
ic:#Io.In.t ->
encoding:Encoding.t ->
unit ->
unit
Expand All @@ -43,7 +43,7 @@ val handle_stream_close :
t ->
buf_pool:Buf_pool.t ->
meta:Meta.meta ->
ic:#Io.In.bufferized_t ->
ic:#Io.In.t ->
encoding:Encoding.t ->
unit ->
unit
Expand All @@ -52,7 +52,7 @@ val call :
t ->
?buf_pool:Buf_pool.t ->
timer:Timer.t ->
oc:#Io.Out.bufferized_t Lock.t ->
oc:#Io.Out.t Lock.t ->
encoding:Encoding.t ->
?headers:Meta.header list ->
?timeout_s:float ->
Expand All @@ -69,7 +69,7 @@ val call_client_stream :
t ->
?buf_pool:Buf_pool.t ->
timer:Timer.t ->
oc:#Io.Out.bufferized_t Lock.t ->
oc:#Io.Out.t Lock.t ->
encoding:Encoding.t ->
?headers:Meta.header list ->
?timeout_s:float ->
Expand All @@ -89,7 +89,7 @@ val call_server_stream :
t ->
?buf_pool:Buf_pool.t ->
timer:Timer.t ->
oc:#Io.Out.bufferized_t Lock.t ->
oc:#Io.Out.t Lock.t ->
encoding:Encoding.t ->
?headers:Meta.header list ->
?timeout_s:float ->
Expand Down
1 change: 1 addition & 0 deletions src/core/dune
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
atomic
moonpool
batrpc.util
iostream
pbrt
pbrt_yojson
hmap
Expand Down
2 changes: 1 addition & 1 deletion src/core/encoding.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ let write_to_oc (oc : #Io.Out.t) (self : t) : unit =
let read_from_ic (ic : #Io.In.t) : t =
let magic_number =
let bs4 = Bytes.create 4 in
ic#really_input bs4 0 4;
Io.In.really_input ic bs4 0 4;
Bytes.get_int32_le bs4 0
in

Expand Down
29 changes: 14 additions & 15 deletions src/core/framing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ open struct
Error.raise_err (Error.Deser_error msg))
| exception _ -> Error.raise_err (Error.Deser_error "invalid json")

let read_line_exn_ (ic : #Io.In.bufferized_t) : string =
match ic#read_line () with
let read_line_exn_ (ic : #Io.In.t) : string =
match Io.In.input_line ic with
| Some s -> s
| None -> Error.raise_err (Error.Network_error "Could not read next line")
end
Expand All @@ -32,21 +32,21 @@ let compression_threshold = 2 * 1024

let read_meta_b ~buf_pool (ic : #Io.In.t) : Meta.meta option =
let size_buf = Bytes.create 2 in
match ic#really_input size_buf 0 2 with
match Io.In.really_input ic size_buf 0 2 with
| exception End_of_file -> None
| () ->
let size = Bytes.get_int16_le size_buf 0 in
let meta =
let@ buf = Buf_pool.with_buf buf_pool size in
ic#really_input buf 0 size;
Io.In.really_input ic buf 0 size;
let dec = Pbrt.Decoder.of_subbytes buf 0 size in
Meta.decode_pb_meta dec
in

Some meta

let read_meta_j (ic : #Io.In.bufferized_t) : Meta.meta option =
match ic#read_line () with
let read_meta_j (ic : #Io.In.t) : Meta.meta option =
match Io.In.input_line ic with
| None -> None
| Some j -> Some (decode_json_ Meta.decode_json_meta j)

Expand All @@ -58,7 +58,7 @@ let read_meta ~buf_pool ic ~encoding : _ option =
let read_with_dec_ ~buf_pool ic ~(meta : Meta.meta) ~what ~f_dec =
let body_size = meta.body_size |> unwrap_body_size |> Int32.to_int in
let@ buf = Buf_pool.with_buf buf_pool body_size in
ic#really_input buf 0 body_size;
Io.In.really_input ic buf 0 body_size;

(* decompress if needed *)
let buf, body_size =
Expand All @@ -76,14 +76,14 @@ let read_with_dec_ ~buf_pool ic ~(meta : Meta.meta) ~what ~f_dec =
let ctx = Error.(mk @@ Deser_error err) in
Error.(failf ~ctx "Reading body of %s failed" what)

let read_with_dec_j_ (ic : #Io.In.bufferized_t) ~what ~f_dec =
let read_with_dec_j_ (ic : #Io.In.t) ~what ~f_dec =
try
let line = read_line_exn_ ic in
decode_json_ f_dec line
with Error.E err -> Error.(failf ~ctx:err "Reading body of %s failed" what)

let read_body_req ~buf_pool (ic : #Io.In.bufferized_t) ~encoding
~(meta : Meta.meta) (rpc : _ Service.Server.rpc) =
let read_body_req ~buf_pool (ic : #Io.In.t) ~encoding ~(meta : Meta.meta)
(rpc : _ Service.Server.rpc) =
let@ () =
Error.guardf (fun k ->
k "Batrpc: reading the request for method %S" rpc.name)
Expand Down Expand Up @@ -125,7 +125,7 @@ let read_and_discard ~buf_pool ic ~encoding ~(meta : Meta.meta) : unit =
| Encoding.Binary ->
let body_size = meta.body_size |> unwrap_body_size |> Int32.to_int in
let@ buf = Buf_pool.with_buf buf_pool body_size in
ic#really_input buf 0 body_size
Io.In.really_input ic buf 0 body_size
| Encoding.Json -> ignore (read_line_exn_ ic : string)

let read_empty ~buf_pool (ic : #Io.In.t) ~encoding ~(meta : Meta.meta) =
Expand Down Expand Up @@ -175,7 +175,7 @@ let write_meta_b ~enc oc meta : unit =

let write_meta_j_ oc meta : unit =
let j = Meta.encode_json_meta meta |> Yojson.Basic.to_string in
oc#output_line j
Io.Out.output_line oc j

let write_with_b_ ?buf_pool ?enc (oc : #Io.Out.t) ~(meta : Meta.meta) ~f_enc x :
unit =
Expand Down Expand Up @@ -204,15 +204,14 @@ let write_with_b_ ?buf_pool ?enc (oc : #Io.Out.t) ~(meta : Meta.meta) ~f_enc x :
write_meta_b ~enc oc meta;
oc#output body_str 0 (Bytes.length body_str)

let write_with_j_ (oc : #Io.Out.bufferized_t) ~(meta : Meta.meta) ~f_enc x :
unit =
let write_with_j_ (oc : #Io.Out.t) ~(meta : Meta.meta) ~f_enc x : unit =
(* send meta *)
let meta : Meta.meta =
{ meta with Meta.body_compression = None; body_size = None }
in
write_meta_j_ oc meta;
let j = f_enc x |> Yojson.Basic.to_string in
oc#output_line j
Io.Out.output_line oc j

let write_req ?buf_pool ?enc (oc : #Io.Out.t) ~encoding
(rpc : _ Service.Client.rpc) meta req : unit =
Expand Down
23 changes: 10 additions & 13 deletions src/core/framing.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,27 @@
open Common_

val read_meta :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
encoding:Encoding.t ->
Meta.meta option
buf_pool:Buf_pool.t -> #Io.In.t -> encoding:Encoding.t -> Meta.meta option

val read_body_req :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
#Io.In.t ->
encoding:Encoding.t ->
meta:Meta.meta ->
('req, _, _, _) Service.Server.rpc ->
'req

val read_body_res :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
#Io.In.t ->
encoding:Encoding.t ->
meta:Meta.meta ->
(_, _, 'res, _) Service.Client.rpc ->
'res

val read_and_discard :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
#Io.In.t ->
encoding:Encoding.t ->
meta:Meta.meta ->
unit
Expand All @@ -50,22 +47,22 @@ val read_and_discard :

val read_error :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
#Io.In.t ->
encoding:Encoding.t ->
meta:Meta.meta ->
Meta.error

val read_empty :
buf_pool:Buf_pool.t ->
#Io.In.bufferized_t ->
#Io.In.t ->
encoding:Encoding.t ->
meta:Meta.meta ->
unit

val write_req :
?buf_pool:Buf_pool.t ->
?enc:Pbrt.Encoder.t ->
#Io.Out.bufferized_t ->
#Io.Out.t ->
encoding:Encoding.t ->
('req, _, _, _) Service.Client.rpc ->
Meta.meta ->
Expand All @@ -75,7 +72,7 @@ val write_req :
val write_error :
?buf_pool:Buf_pool.t ->
?enc:Pbrt.Encoder.t ->
#Io.Out.bufferized_t ->
#Io.Out.t ->
encoding:Encoding.t ->
Meta.meta ->
Meta.error ->
Expand All @@ -84,7 +81,7 @@ val write_error :
val write_empty :
?buf_pool:Buf_pool.t ->
?enc:Pbrt.Encoder.t ->
#Io.Out.bufferized_t ->
#Io.Out.t ->
encoding:Encoding.t ->
Meta.meta ->
unit ->
Expand All @@ -93,7 +90,7 @@ val write_empty :
val write_res :
?buf_pool:Buf_pool.t ->
?enc:Pbrt.Encoder.t ->
#Io.Out.bufferized_t ->
#Io.Out.t ->
encoding:Encoding.t ->
(_, _, 'res, _) Service.Server.rpc ->
Meta.meta ->
Expand Down
Loading

0 comments on commit 640a78b

Please sign in to comment.