Skip to content

Commit

Permalink
Minimize get_partial & set_partial calls.
Browse files Browse the repository at this point in the history
calling these for every requested inner chunk of a shard is
expensive can can lead to a `Unix.EMFILE` error. This commit
attempts to minimize the number of calls by making sure these are
called once on all requested inner chunks of a shard. This should
lead to a significant decrease in the number of open files open
for read/write at the same time when using a Zarr_lwt `FilesystemStore`.
  • Loading branch information
zoj613 committed Aug 22, 2024
1 parent 97e7226 commit b43c44d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 36 deletions.
84 changes: 53 additions & 31 deletions lib/codecs/array_to_bytes.ml
Original file line number Diff line number Diff line change
Expand Up @@ -432,26 +432,41 @@ module Make (Io : Types.IO) = struct
let id, co = RegularGrid.index_coord_pair grid c in
ArrayMap.add_to_list id (co, v) acc) ArrayMap.empty pairs
in
let inner = {repr with shape = t.chunk_shape} in
Deferred.fold_left
(fun acc (idx, z) ->
let oc = Array.append idx [|0|] in
let nc = Array.append idx [|1|] in
let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in
let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in
get_partial [pad + ofs, Some nb] >>= fun l ->
let arr = decode_chain t.codecs inner @@ List.hd l in
List.iter (fun (c, v) -> Ndarray.set arr c v) z;
let s = encode_chain t.codecs arr in
let nb' = String.length s in
(* if codec chain doesnt contain compression, update chunk in-place *)
if nb' = nb then set_partial [pad + ofs, s] >>| fun () -> acc
else
(Any.set idx_arr oc @@ Stdint.Uint64.of_int acc;
Any.set idx_arr nc @@ Stdint.Uint64.of_int nb';
set_partial ~append:true [acc, s] >>| fun () ->
acc + nb')) (csize - pad) @@ ArrayMap.bindings m
>>= fun bsize ->
let bindings = ArrayMap.bindings m in
let ranges, coords =
List.split @@
List.map
(fun (i, _) ->
let oc = Array.append i [|0|] in
let nc = Array.append i [|1|] in
let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in
let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in
(pad + ofs, Some nb), (oc, nc, ofs, nb)) bindings
in
get_partial ranges >>= fun xs ->
let repr' = {repr with shape = t.chunk_shape} in
let bsize, inplace, append =
List.fold_left
(fun (a, l, r) ((x, (oc, nc, ofs, nb)), (_, z)) ->
let arr = decode_chain t.codecs repr' x in
List.iter (fun (c, v) -> Ndarray.set arr c v) z;
let s = encode_chain t.codecs arr in
let nb' = String.length s in
if nb' = nb then a, (pad + ofs, s) :: l, r
else
(Any.set idx_arr oc @@ Stdint.Uint64.of_int a;
Any.set idx_arr nc @@ Stdint.Uint64.of_int nb';
a + nb', l, (a, s) :: r))
(csize - pad, [], []) List.(combine (combine xs coords) bindings)
in
begin match inplace with
| [] -> Deferred.return_unit
| xs -> set_partial xs
end >>= fun () ->
begin match append with
| [] -> Deferred.return_unit
| xs -> set_partial ~append:true xs
end >>= fun () ->
let ib = encode_index_chain t.index_codecs idx_arr in
match t.index_location with
| Start -> set_partial [0, ib]
Expand All @@ -471,15 +486,22 @@ module Make (Io : Types.IO) = struct
List.fold_left
(fun acc (i, y) ->
let id, c = RegularGrid.index_coord_pair grid y in
ArrayMap.add_to_list id (i, c) acc) ArrayMap.empty pairs in
let inner = {repr with shape = t.chunk_shape} in
Deferred.concat_map
(fun (idx, z) ->
let oc = Array.append idx [|0|] in
let nc = Array.append idx [|1|] in
let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in
let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in
get_partial [pad + ofs, Some nb] >>| fun l ->
let arr = decode_chain t.codecs inner @@ List.hd l in
List.map (fun (i, c) -> (i, Ndarray.get arr c)) z) (ArrayMap.bindings m)
ArrayMap.add_to_list id (i, c) acc) ArrayMap.empty pairs
in
let ranges =
List.map
(fun (i, _) ->
let oc = Array.append i [|0|] in
let nc = Array.append i [|1|] in
let ofs = Stdint.Uint64.to_int @@ Any.get idx_arr oc in
let nb = Stdint.Uint64.to_int @@ Any.get idx_arr nc in
pad + ofs, Some nb) ArrayMap.(bindings m)
in
get_partial ranges >>| fun xs ->
let repr' = {repr with shape = t.chunk_shape} in
List.concat_map
(fun (x, (_, z)) ->
let arr = decode_chain t.codecs repr' x in
List.map (fun (i, c) -> i, Ndarray.get arr c) z)
List.(combine xs @@ ArrayMap.bindings m)
end
10 changes: 5 additions & 5 deletions zarr-lwt/storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ module FilesystemStore = struct
(fun ic ->
Lwt_io.length ic >>= fun v ->
let size = Int64.to_int v in
Lwt_list.map_p
Lwt_list.map_s
(fun (rs, len) ->
let count =
match len with
Expand All @@ -74,17 +74,17 @@ module FilesystemStore = struct
(Fun.flip Lwt_io.write value)

let set_partial_values t key ?(append=false) rvs =
let iter_fn = if append then Lwt_list.iter_s else Lwt_list.iter_p in
let flags = Unix.[O_NONBLOCK; O_RDWR] in
Lwt_io.with_file
~flags:(if append then Unix.O_APPEND :: flags else flags)
~perm:t.perm
~mode:Output
(key_to_fspath t key)
(fun oc ->
iter_fn (fun (rs, value) ->
Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () ->
Lwt_io.write oc value) rvs)
Lwt_list.iter_s
(fun (rs, value) ->
Lwt_io.set_position oc @@ Int64.of_int rs >>= fun () ->
Lwt_io.write oc value) rvs)

let list t =
let rec filter_concat acc dir =
Expand Down

0 comments on commit b43c44d

Please sign in to comment.