From 1b46a215c4fd3cc06357550cb5a4761e82a7726b Mon Sep 17 00:00:00 2001 From: Zolisa Bleki Date: Thu, 22 Aug 2024 14:21:03 +0200 Subject: [PATCH 1/2] Minimize `get_partial` & `set_partial` calls. calling these for every requested inner chunk of a shard is expensive can can lead to a `Unix.EMFILE` error. This commit attempts to minimize the number of calls by making sure these are called once on all requested inner chunks of a shard. This should lead to a significant decrease in the number of open files open for read/write at the same time when using a Zarr_lwt `FilesystemStore`. --- lib/codecs/array_to_bytes.ml | 84 +++++++++++++++++++++++------------- zarr-lwt/storage.ml | 10 ++--- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/lib/codecs/array_to_bytes.ml b/lib/codecs/array_to_bytes.ml index 5b453cd..2432705 100644 --- a/lib/codecs/array_to_bytes.ml +++ b/lib/codecs/array_to_bytes.ml @@ -432,26 +432,41 @@ module Make (Io : Types.IO) = struct let id, co = RegularGrid.index_coord_pair grid c in ArrayMap.add_to_list id (co, v) acc) ArrayMap.empty pairs in - let inner = {repr with shape = t.chunk_shape} in - Deferred.fold_left - (fun acc (idx, z) -> - let oc = Array.append idx [|0|] in - let nc = Array.append idx [|1|] in - let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in - let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in - get_partial [pad + ofs, Some nb] >>= fun l -> - let arr = decode_chain t.codecs inner @@ List.hd l in - List.iter (fun (c, v) -> Ndarray.set arr c v) z; - let s = encode_chain t.codecs arr in - let nb' = String.length s in - (* if codec chain doesnt contain compression, update chunk in-place *) - if nb' = nb then set_partial [pad + ofs, s] >>| fun () -> acc - else - (Any.set idx_arr oc @@ Stdint.Uint64.of_int acc; - Any.set idx_arr nc @@ Stdint.Uint64.of_int nb'; - set_partial ~append:true [acc, s] >>| fun () -> - acc + nb')) (csize - pad) @@ ArrayMap.bindings m - >>= fun bsize -> + let bindings = ArrayMap.bindings m in + let ranges, coords = + List.split @@ + List.map + (fun (i, _) -> + let oc = Array.append i [|0|] in + let nc = Array.append i [|1|] in + let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in + let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in + (pad + ofs, Some nb), (oc, nc, ofs, nb)) bindings + in + get_partial ranges >>= fun xs -> + let repr' = {repr with shape = t.chunk_shape} in + let bsize, inplace, append = + List.fold_left + (fun (a, l, r) ((x, (oc, nc, ofs, nb)), (_, z)) -> + let arr = decode_chain t.codecs repr' x in + List.iter (fun (c, v) -> Ndarray.set arr c v) z; + let s = encode_chain t.codecs arr in + let nb' = String.length s in + if nb' = nb then a, (pad + ofs, s) :: l, r + else + (Any.set idx_arr oc @@ Stdint.Uint64.of_int a; + Any.set idx_arr nc @@ Stdint.Uint64.of_int nb'; + a + nb', l, (a, s) :: r)) + (csize - pad, [], []) List.(combine (combine xs coords) bindings) + in + begin match inplace with + | [] -> Deferred.return_unit + | xs -> set_partial xs + end >>= fun () -> + begin match append with + | [] -> Deferred.return_unit + | xs -> set_partial ~append:true xs + end >>= fun () -> let ib = encode_index_chain t.index_codecs idx_arr in match t.index_location with | Start -> set_partial [0, ib] @@ -471,15 +486,22 @@ module Make (Io : Types.IO) = struct List.fold_left (fun acc (i, y) -> let id, c = RegularGrid.index_coord_pair grid y in - ArrayMap.add_to_list id (i, c) acc) ArrayMap.empty pairs in - let inner = {repr with shape = t.chunk_shape} in - Deferred.concat_map - (fun (idx, z) -> - let oc = Array.append idx [|0|] in - let nc = Array.append idx [|1|] in - let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in - let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in - get_partial [pad + ofs, Some nb] >>| fun l -> - let arr = decode_chain t.codecs inner @@ List.hd l in - List.map (fun (i, c) -> (i, Ndarray.get arr c)) z) (ArrayMap.bindings m) + ArrayMap.add_to_list id (i, c) acc) ArrayMap.empty pairs + in + let ranges = + List.map + (fun (i, _) -> + let oc = Array.append i [|0|] in + let nc = Array.append i [|1|] in + let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in + let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in + pad + ofs, Some nb) ArrayMap.(bindings m) + in + get_partial ranges >>| fun xs -> + let repr' = {repr with shape = t.chunk_shape} in + List.concat_map + (fun (x, (_, z)) -> + let arr = decode_chain t.codecs repr' x in + List.map (fun (i, c) -> i, Ndarray.get arr c) z) + List.(combine xs @@ ArrayMap.bindings m) end diff --git a/zarr-lwt/storage.ml b/zarr-lwt/storage.ml index dcafb46..288baec 100644 --- a/zarr-lwt/storage.ml +++ b/zarr-lwt/storage.ml @@ -52,7 +52,7 @@ module FilesystemStore = struct (fun ic -> Lwt_io.length ic >>= fun v -> let size = Int64.to_int v in - Lwt_list.map_p + Lwt_list.map_s (fun (rs, len) -> let count = match len with @@ -74,7 +74,6 @@ module FilesystemStore = struct (Fun.flip Lwt_io.write value) let set_partial_values t key ?(append=false) rvs = - let iter_fn = if append then Lwt_list.iter_s else Lwt_list.iter_p in let flags = Unix.[O_NONBLOCK; O_RDWR] in Lwt_io.with_file ~flags:(if append then Unix.O_APPEND :: flags else flags) @@ -82,9 +81,10 @@ module FilesystemStore = struct ~mode:Output (key_to_fspath t key) (fun oc -> - iter_fn (fun (rs, value) -> - Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () -> - Lwt_io.write oc value) rvs) + Lwt_list.iter_s + (fun (rs, value) -> + Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () -> + Lwt_io.write oc value) rvs) let list t = let rec filter_concat acc dir = From 10b81e53a18abbbaaec9a5cea03a8382c7d12cf6 Mon Sep 17 00:00:00 2001 From: Zolisa Bleki Date: Thu, 22 Aug 2024 15:32:08 +0200 Subject: [PATCH 2/2] Pin version of setup-ocaml Action so it uses opam<2.2.1 --- .github/workflows/build-and-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 99115cd..cc357ae 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -35,7 +35,7 @@ jobs: fetch-depth: 2 - name: setup-ocaml - uses: ocaml/setup-ocaml@master + uses: ocaml/setup-ocaml@v2 with: ocaml-compiler: ${{ matrix.ocaml-compiler }}