Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize eio-based filesystem store abstract functions. #61

Merged
merged 7 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ open Zarr_sync.Storage
(* opens infix operators >>= and >>| for monadic bind & map *)
open FilesytemStore.Deferred.Infix

let store = FilesystemStore.create_store "testdata.zarr";;
let store = FilesystemStore.create "testdata.zarr";;
```
### create group
```ocaml
Expand Down
20 changes: 10 additions & 10 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
arrays, designed for use in parallel computing.")
(depends
dune
(ocaml (>= 4.14.2))
yojson
ezgzip
owl
stdint
checkseum
(ocaml (>= 4.14.0))
(yojson (>= 1.6.0))
(ezgzip (>= 0.2.0))
(stdint (>= 0.7.2))
(checkseum (>= 0.4.0))
(owl (>= 1.1))
(odoc :with-doc)
(ounit2 :with-test)
(ppx_deriving :with-test)
Expand All @@ -45,7 +45,7 @@
(synopsis "Synchronous API for Zarr.")
(depends
dune
(ocaml (>= 4.14.2))
(ocaml (>= 4.14.0))
(zarr (= :version))
(odoc :with-doc)
(ounit2 :with-test)
Expand All @@ -57,9 +57,9 @@
(synopsis "Lwt-aware API for Zarr.")
(depends
dune
(ocaml (>= 4.14.2))
(ocaml (>= 4.14.0))
(zarr (= :version))
lwt
(lwt (>= 2.5.1))
(odoc :with-doc)
(ounit2 :with-test)
(bisect_ppx
Expand All @@ -72,7 +72,7 @@
dune
(ocaml (>= 5.1.0))
(zarr (= :version))
eio_main
(eio_main (>= 1.0))
(odoc :with-doc)
(ounit2 :with-test)
(bisect_ppx
Expand Down
53 changes: 27 additions & 26 deletions examples/inmemory_zipstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
module MemoryZipStore : sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t
(*val create : ?level:Zipc_deflate.level -> string -> t *)
val open_store : ?level:Zipc_deflate.level -> string -> t
val write_to_file : t -> unit Deferred.t
val with_open : ?level:Zipc_deflate.level -> string -> (t -> 'a Deferred.t) -> 'a Deferred.t
end = struct
module M = Map.Make(String)

Expand Down Expand Up @@ -55,13 +54,11 @@ end = struct
| Ok s -> s

let get_partial_values t key ranges =
get t key >>= fun data ->
get t key >>| fun data ->
let size = String.length data in
ranges |> Lwt_list.map_p @@ fun (offset, len) ->
Deferred.return
(match len with
| None -> String.sub data offset (size - offset)
| Some l -> String.sub data offset l)
ranges |> List.map @@ fun (ofs, len) ->
let f v = String.sub data ofs v in
Option.fold ~none:(f (size - ofs)) ~some:f len

let list t =
Deferred.return @@ Zipc.fold
Expand Down Expand Up @@ -146,21 +143,26 @@ end = struct

(*let create ?(level=`Default) path = Z.{ic = Zipc.empty; level; path} *)

let open_store ?(level=`Default) path =
match Zipc.of_binary_string In_channel.(with_open_bin path input_all) with
| Ok ic -> Z.{ic; level; path; mutex = Lwt_mutex.create ()}
| Error e -> failwith e

let write_to_file (t : Z.t) =
Lwt_io.with_file
~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK]
~mode:Lwt_io.Output
t.path
(fun oc ->
let open Lwt.Infix in
match Zipc.to_binary_string t.ic with
| Error e -> failwith e
| Ok s -> Lwt_io.write oc s >>= fun () -> Lwt_io.flush oc)
let with_open ?(level=`Default) path f =
let s = In_channel.(with_open_bin path input_all) in
let t = match Zipc.of_binary_string s with
| Ok ic -> Z.{ic; level; path; mutex = Lwt_mutex.create ()}
| Error e -> failwith e
in
Lwt.finalize
(fun () -> f t)
(fun () ->
Lwt_io.with_file
~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK]
~mode:Lwt_io.Output
t.path
(fun oc ->
let open Lwt.Infix in
match Zipc.to_binary_string t.ic with
| Error e -> failwith e
| Ok s' ->
if String.equal s s' then Lwt.return_unit else
Lwt_io.write oc s' >>= fun () -> Lwt_io.flush oc))
end

let _ =
Expand All @@ -169,7 +171,7 @@ let _ =
let open MemoryZipStore.Deferred.Infix in

let printlist = [%show: string list] in
let store = MemoryZipStore.open_store "examples/data/testdata.zip" in
MemoryZipStore.with_open "examples/data/testdata.zip" @@ fun store ->
MemoryZipStore.find_all_nodes store >>= fun (xs, _) ->
print_endline @@ "All array nodes: " ^ printlist (List.map ArrayNode.to_path xs);
let anode = List.hd @@ List.filter
Expand All @@ -181,8 +183,7 @@ let _ =
Owl.Dense.Ndarray.Generic.map
(fun _ -> Owl_stats_dist.uniform_int_rvs ~a:0 ~b:255 |> Char.chr) x in
MemoryZipStore.write_array store anode slice x' >>= fun () ->
MemoryZipStore.read_array store anode slice Bigarray.Char >>= fun y ->
MemoryZipStore.read_array store anode slice Bigarray.Char >>| fun y ->
print_string @@ "AFTER: " ^ Owl_pretty.dsnda_to_string y;
MemoryZipStore.write_to_file store >>| fun () ->
print_endline "Zip store has been update."
end
7 changes: 3 additions & 4 deletions examples/readonly_zipstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ end = struct
let get_partial_values t key ranges =
get t key >>= fun data ->
let size = String.length data in
ranges |> Eio.Fiber.List.map @@ fun (offset, len) ->
match len with
| None -> String.sub data offset (size - offset)
| Some l -> String.sub data offset l
ranges |> Eio.Fiber.List.map @@ fun (ofs, len) ->
let f v = String.sub data ofs v in
Option.fold ~none:(f (size - ofs)) ~some:f len

let list t =
Zip.entries t |> Eio.Fiber.List.filter_map @@ function
Expand Down
2 changes: 1 addition & 1 deletion zarr-eio.opam
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ depends: [
"dune" {>= "3.15"}
"ocaml" {>= "5.1.0"}
"zarr" {= version}
"eio_main"
"eio_main" {>= "1.0"}
"odoc" {with-doc}
"ounit2" {with-test}
"bisect_ppx" {dev & >= "2.5.0" & with-test}
Expand Down
81 changes: 38 additions & 43 deletions zarr-eio/src/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,57 @@
let fspath_to_key t (path : Eio.Fs.dir_ty Eio.Path.t) =
let s = snd path in
let pos = String.length (snd t.root) + 1 in
String.sub s pos @@ String.length s - pos
String.(sub s pos @@ length s - pos)

let key_to_fspath t key = Eio.Path.(t.root / key)

let size t key =
let fp = key_to_fspath t key in
Eio.Path.with_open_in fp @@ fun flow ->
Optint.Int63.to_int (Eio.File.size flow)
Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow ->
Optint.Int63.to_int @@ Eio.File.size flow

let get t key =
try Eio.Path.load @@ key_to_fspath t key with
| Eio.Io (Eio.Fs.E Not_found Eio_unix.Unix_error _, _) ->
raise (Zarr.Storage.Key_not_found key)

let get_partial_values t key ranges =
let fp = key_to_fspath t key in
Eio.Path.with_open_in fp @@ fun flow ->
let filesize = Optint.Int63.to_int (Eio.File.size flow) in
ranges |> Eio.Fiber.List.map @@ fun (start, len) ->
let bufsize =
match len with
| Some l -> l
| None -> filesize - start in
let pos = Optint.Int63.of_int start in
let file_offset = Eio.File.seek flow pos `Set in
let buf = Cstruct.create bufsize in
Eio.Path.with_open_in (key_to_fspath t key) @@ fun flow ->
let size = Optint.Int63.to_int @@ Eio.File.size flow in
let size', ranges' =
List.fold_left_map (fun a (s, l) ->
let a' = Option.fold ~none:(a + size - s) ~some:(Int.add a) l in
a', (Optint.Int63.of_int s, a, a' - a)) 0 ranges in
let buffer = Bigarray.Array1.create Char C_layout size' in
ranges' |> Eio.Fiber.List.map @@ fun (fo, off, len) ->
let file_offset = Eio.File.seek flow fo `Set in
let buf = Cstruct.of_bigarray ~off ~len buffer in
Eio.File.pread_exact flow ~file_offset [buf];
Cstruct.to_string buf

let set t key value =
let fp = key_to_fspath t key in
let parent_dir = fst @@ Option.get @@ Eio.Path.split fp in
Eio.Path.mkdirs ~exists_ok:true ~perm:t.perm parent_dir;
Option.fold
~none:()
~some:(fun (p, _) -> Eio.Path.mkdirs ~exists_ok:true ~perm:t.perm p)
(Eio.Path.split fp);
Eio.Path.save ~create:(`Or_truncate t.perm) fp value

let set_partial_values t key ?(append=false) rvs =
let fp = key_to_fspath t key in
Eio.Path.with_open_out ~append ~create:`Never fp @@ fun flow ->
Eio.Fiber.List.iter
(fun (start, value) ->
let file_offset = Optint.Int63.of_int start in
let _ = Eio.File.seek flow file_offset `Set in
let buf = Cstruct.of_string value in
Eio.File.pwrite_all flow ~file_offset [buf]) rvs
let l = List.fold_left (fun a (_, s) -> Int.max a (String.length s)) 0 rvs in
let buffer = Bigarray.Array1.create Char C_layout l in
let allocator len = Cstruct.of_bigarray ~off:0 ~len buffer in
Eio.Path.with_open_out ~append ~create:`Never (key_to_fspath t key) @@ fun flow ->
rvs |> List.iter @@ fun (ofs, str) ->
let file_offset = Eio.File.seek flow (Optint.Int63.of_int ofs) `Set in
Eio.File.pwrite_all flow ~file_offset [Cstruct.of_string ~allocator str]

let list t =
let rec aux acc dir =
match Eio.Path.read_dir dir with
| [] -> acc
| xs ->
List.concat_map
(fun x ->
match Eio.Path.(dir / x) with
| p when Eio.Path.is_directory p -> aux acc p
| p -> (fspath_to_key t p) :: acc) xs
List.fold_left
(fun a x ->
match Eio.Path.(dir / x) with
| p when Eio.Path.is_directory p -> aux a p
| p -> (fspath_to_key t p) :: a) acc (Eio.Path.read_dir dir)
in aux [] t.root

let is_member t key =
Expand All @@ -78,13 +74,13 @@
let list_prefix t prefix =
let rec aux acc dir =
let xs = Eio.Path.read_dir dir in
List.concat_map
(fun x ->
List.fold_left
(fun a x ->
match Eio.Path.(dir / x) with
| p when Eio.Path.is_directory p -> aux acc p
| p when Eio.Path.is_directory p -> aux a p
| p ->
let key = fspath_to_key t p in
if String.starts_with ~prefix key then key :: acc else acc) xs
if String.starts_with ~prefix key then key :: a else a) acc xs

Check warning on line 83 in zarr-eio/src/storage.ml

View check run for this annotation

Codecov / codecov/patch

zarr-eio/src/storage.ml#L83

Added line #L83 was not covered by tests
in aux [] t.root

let erase_prefix t pre =
Expand All @@ -95,7 +91,7 @@
else Eio.Path.rmtree ~missing_ok:true prefix

let list_dir t prefix =
let module StrSet = Zarr.Util.StrSet in
let module S = Zarr.Util.StrSet in
let n = String.length prefix in
let rec aux acc dir =
List.fold_left
Expand All @@ -107,12 +103,11 @@
let pred = String.starts_with ~prefix key in
match key with
| k when pred && String.contains_from k n '/' ->
StrSet.add String.(sub k 0 @@ 1 + index_from k n '/') l, r
S.add String.(sub k 0 @@ 1 + index_from k n '/') l, r
| k when pred -> l, k :: r
| _ -> a) acc @@ Eio.Path.read_dir dir
in
let prefs, keys = aux (StrSet.empty, []) t.root in
keys, StrSet.elements prefs
| _ -> a) acc (Eio.Path.read_dir dir) in
let prefs, keys = aux (S.empty, []) t.root in
keys, S.elements prefs
end

module U = Zarr.Util
Expand Down
4 changes: 2 additions & 2 deletions zarr-lwt.opam
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ doc: "https://zoj613.github.io/zarr-ml"
bug-reports: "https://github.com/zoj613/zarr-ml/issues"
depends: [
"dune" {>= "3.15"}
"ocaml" {>= "4.14.2"}
"ocaml" {>= "4.14.0"}
"zarr" {= version}
"lwt"
"lwt" {>= "2.5.1"}
"odoc" {with-doc}
"ounit2" {with-test}
"bisect_ppx" {dev & >= "2.5.0" & with-test}
Expand Down
Loading
Loading