Skip to content

Commit

Permalink
bufferize IO channels
Browse files Browse the repository at this point in the history
  • Loading branch information
c-cube committed Feb 14, 2024
1 parent 774c705 commit c14a599
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 17 deletions.
88 changes: 79 additions & 9 deletions src/core/io.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
(** IO primitives *)

let default_buf_size = 4 * 1024

module In = struct
(** Input stream *)
class type t =
Expand All @@ -9,6 +11,24 @@ module In = struct
method close : unit -> unit
end

open struct
class virtual base =
object (self)
method virtual input : bytes -> int -> int -> int

(* provide this loop *)
method really_input bs i len0 =
let i = ref i in
let len = ref len0 in
while !len > 0 do
let n = self#input bs !i !len in
if n = 0 then raise End_of_file;
i := !i + n;
len := !len - n
done
end
end

class of_fd ?(shutdown = false) ?n_received ?(close_noerr = false)
(fd : Unix.file_descr) : t =
object
Expand All @@ -17,15 +37,7 @@ module In = struct
Byte_counter.add_opt n_received n;
n

method really_input bs i len0 =
let i = ref i in
let len = ref len0 in
while !len > 0 do
let n = Unix.read fd bs !i !len in
i := !i + n;
len := !len - n
done;
Byte_counter.add_opt n_received len0
inherit base

method close () =
if shutdown then (
Expand All @@ -36,6 +48,31 @@ module In = struct
) else
Unix.close fd
end

class bufferized ?(buf = Bytes.create default_buf_size) (ic : #t) : t =
let buf_off = ref 0 in
let buf_len = ref 0 in
let eof = ref false in

let refill_ () =
if not !eof then (
buf_off := 0;
buf_len := ic#input buf 0 (Bytes.length buf);
if !buf_len = 0 then eof := true
)
in
object
method input bs i len =
if !buf_len = 0 then refill_ ();
let n = min len !buf_len in
Bytes.blit buf !buf_off bs i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n;
n

inherit base
method close () = ic#close ()
end
end

module Out = struct
Expand Down Expand Up @@ -73,6 +110,39 @@ module Out = struct
Unix.close fd
end

class bufferized ?(buf = Bytes.create default_buf_size) (oc : #t) : t =
let off = ref 0 in
let flush_ () =
if !off > 0 then (
oc#output buf 0 !off;
off := 0
)
in
let[@inline] maybe_flush_ () = if !off = Bytes.length buf then flush_ () in

object
method flush () = flush_ ()

method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
maybe_flush_ ();
let n = min !len (Bytes.length buf - !off) in
assert (n > 0);

Bytes.blit bs !i buf !off !len;
i := !i + n;
len := !len - n;
off := !off + n
done;
maybe_flush_ ()

method close () =
flush_ ();
oc#close ()
end

class of_buffer (buf : Buffer.t) : t =
object
method output bs i len = Buffer.add_subbytes buf bs i len
Expand Down
10 changes: 7 additions & 3 deletions src/unix/tcp_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ let connect ?active ?buf_pool ?middlewares
Unix.connect sock addr;

let ic =
new Io.In.of_fd
~shutdown:true ~close_noerr:true ~n_received:Net_stats.n_received sock
new Io.In.bufferized
@@ new Io.In.of_fd
~shutdown:true ~close_noerr:true ~n_received:Net_stats.n_received sock
in
let oc =
new Io.Out.bufferized
@@ new Io.Out.of_fd ~close_noerr:true ~n_sent:Net_stats.n_sent sock
in
let oc = new Io.Out.of_fd ~close_noerr:true ~n_sent:Net_stats.n_sent sock in

let client_state = Client_state.create ?middlewares () in
let conn : t =
Expand Down
12 changes: 7 additions & 5 deletions src/unix/tcp_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,15 @@ let handle_client_async_ (self : t) client_sock client_addr : unit =
Sys.set_signal Sys.sigpipe Sys.Signal_ignore;
let ic =
new Io.In.of_fd
~shutdown:true ~close_noerr:true ~n_received:Net_stats.n_received
client_sock
new Io.In.bufferized
@@ new Io.In.of_fd
~shutdown:true ~close_noerr:true ~n_received:Net_stats.n_received
client_sock
in
let oc =
new Io.Out.of_fd
~shutdown:true ~close_noerr:true ~n_sent:Net_stats.n_sent client_sock
new Io.Out.bufferized
@@ new Io.Out.of_fd
~shutdown:true ~close_noerr:true ~n_sent:Net_stats.n_sent client_sock
in
(* spawn a background thread, using the same [active] so as
Expand Down

0 comments on commit c14a599

Please sign in to comment.