Skip to content

Commit

Permalink
Use an appropriately sized Lwt IO buffer for reads/writes.
Browse files Browse the repository at this point in the history
Also ensure output channels are flushed after write promise resolves.
  • Loading branch information
zoj613 committed Aug 22, 2024
1 parent 6c0904c commit 39c64ea
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions zarr-lwt/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ module FilesystemStore = struct
(fun _ -> Lwt.return_unit) (* If parent_dir exists at the top of the recursion stack *)
| true -> Lwt.return_unit

let size t key =
Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int

let get t key =
size t key >>= fun bufsize ->
Lwt.catch
(fun () ->
Lwt_io.with_file
~buffer:(Lwt_bytes.create bufsize)
~flags:Unix.[O_RDONLY; O_NONBLOCK]
~perm:t.perm
~mode:Input
~mode:Lwt_io.Input
(key_to_fspath t key)
Lwt_io.read)
(function
Expand All @@ -44,10 +49,12 @@ module FilesystemStore = struct
| exn -> Lwt.reraise exn)

let get_partial_values t key ranges =
size t key >>= fun bufsize ->
Lwt_io.with_file
~buffer:(Lwt_bytes.create bufsize)
~flags:Unix.[O_RDONLY; O_NONBLOCK]
~perm:t.perm
~mode:Input
~mode:Lwt_io.Input
(key_to_fspath t key)
(fun ic ->
Lwt_io.length ic >>= fun v ->
Expand All @@ -59,32 +66,32 @@ module FilesystemStore = struct
| Some l -> l
| None -> size - rs in
Lwt_io.set_position ic @@ Int64.of_int rs >>= fun () ->
Lwt_io.read ~count ic >>| fun b ->
if String.length b < count
then raise End_of_file else b) ranges)
Lwt_io.read ~count ic) ranges)

let set t key value =
let filename = key_to_fspath t key in
create_parent_dir filename t.perm >>= fun () ->
Lwt_io.with_file
~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK]
~perm:t.perm
~mode:Output
~mode:Lwt_io.Output
filename
(Fun.flip Lwt_io.write value)
(fun oc -> Lwt_io.write oc value >>= fun () -> Lwt_io.flush oc)

let set_partial_values t key ?(append=false) rvs =
let flags = Unix.[O_NONBLOCK; O_RDWR] in
size t key >>= fun bufsize ->
let flags = Unix.[O_NONBLOCK; O_WRONLY] in
Lwt_io.with_file
~buffer:(Lwt_bytes.create bufsize)
~flags:(if append then Unix.O_APPEND :: flags else flags)
~perm:t.perm
~mode:Output
~mode:Lwt_io.Output
(key_to_fspath t key)
(fun oc ->
Lwt_list.iter_s
(fun (rs, value) ->
Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () ->
Lwt_io.write oc value) rvs)
Lwt_io.write oc value) rvs >>= fun () -> Lwt_io.flush oc)

let list t =
let rec filter_concat acc dir =
Expand All @@ -103,9 +110,6 @@ module FilesystemStore = struct
let erase t key =
Lwt_unix.unlink @@ key_to_fspath t key

let size t key =
Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int

let list_prefix t pre =
list t >>= Lwt_list.filter_p
(fun x -> Lwt.return @@ String.starts_with ~prefix:pre x)
Expand Down

0 comments on commit 39c64ea

Please sign in to comment.