From 39c64ea6eb0a0eefea8a9964629e1ff73e78c682 Mon Sep 17 00:00:00 2001 From: Zolisa Bleki Date: Thu, 22 Aug 2024 18:52:01 +0200 Subject: [PATCH] Use an appropriately sized Lwt IO buffer for reads/writes. Also ensure output channels are flushed after write promise resolves. --- zarr-lwt/storage.ml | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/zarr-lwt/storage.ml b/zarr-lwt/storage.ml index 288baec..30e3449 100644 --- a/zarr-lwt/storage.ml +++ b/zarr-lwt/storage.ml @@ -29,13 +29,18 @@ module FilesystemStore = struct (fun _ -> Lwt.return_unit) (* If parent_dir exists at the top of the recursion stack *) | true -> Lwt.return_unit + let size t key = + Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int + let get t key = + size t key >>= fun bufsize -> Lwt.catch (fun () -> Lwt_io.with_file + ~buffer:(Lwt_bytes.create bufsize) ~flags:Unix.[O_RDONLY; O_NONBLOCK] ~perm:t.perm - ~mode:Input + ~mode:Lwt_io.Input (key_to_fspath t key) Lwt_io.read) (function @@ -44,10 +49,12 @@ module FilesystemStore = struct | exn -> Lwt.reraise exn) let get_partial_values t key ranges = + size t key >>= fun bufsize -> Lwt_io.with_file + ~buffer:(Lwt_bytes.create bufsize) ~flags:Unix.[O_RDONLY; O_NONBLOCK] ~perm:t.perm - ~mode:Input + ~mode:Lwt_io.Input (key_to_fspath t key) (fun ic -> Lwt_io.length ic >>= fun v -> @@ -59,9 +66,7 @@ module FilesystemStore = struct | Some l -> l | None -> size - rs in Lwt_io.set_position ic @@ Int64.of_int rs >>= fun () -> - Lwt_io.read ~count ic >>| fun b -> - if String.length b < count - then raise End_of_file else b) ranges) + Lwt_io.read ~count ic) ranges) let set t key value = let filename = key_to_fspath t key in @@ -69,22 +74,24 @@ module FilesystemStore = struct Lwt_io.with_file ~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK] ~perm:t.perm - ~mode:Output + ~mode:Lwt_io.Output filename - (Fun.flip Lwt_io.write value) + (fun oc -> Lwt_io.write oc value >>= fun () -> Lwt_io.flush oc) let set_partial_values t key ?(append=false) rvs = - let flags = Unix.[O_NONBLOCK; O_RDWR] in + size t key >>= fun bufsize -> + let flags = Unix.[O_NONBLOCK; O_WRONLY] in Lwt_io.with_file + ~buffer:(Lwt_bytes.create bufsize) ~flags:(if append then Unix.O_APPEND :: flags else flags) ~perm:t.perm - ~mode:Output + ~mode:Lwt_io.Output (key_to_fspath t key) (fun oc -> Lwt_list.iter_s (fun (rs, value) -> Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () -> - Lwt_io.write oc value) rvs) + Lwt_io.write oc value) rvs >>= fun () -> Lwt_io.flush oc) let list t = let rec filter_concat acc dir = @@ -103,9 +110,6 @@ module FilesystemStore = struct let erase t key = Lwt_unix.unlink @@ key_to_fspath t key - let size t key = - Lwt_io.file_length (key_to_fspath t key) >>| Int64.to_int - let list_prefix t pre = list t >>= Lwt_list.filter_p (fun x -> Lwt.return @@ String.starts_with ~prefix:pre x)