Skip to content

Commit

Permalink
Add an example implementing a custom ZipStore.
Browse files Browse the repository at this point in the history
This adds an example showing how one can use the library to create their
own custom Zarr store and the minimum requirements needed to be
satisfied.
  • Loading branch information
zoj613 committed Aug 27, 2024
1 parent cfc85b0 commit 01e6935
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 1 deletion.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ arrays, designed for use in parallel computing.
- Compresses chunks using a variety of supported compression codecs.
- Supports indexing operations to read/write views of a Zarr array.
- Supports storing arrays in-memory or the local filesystem. It is also
extensible, allowing users to create and use their own custom storage backends.
extensible, allowing users to easily create and use their own custom storage
backends. See the example implementing a [Zip file store][9] for more details.
- Supports both synchronous and concurrent I/O via [Lwt][4] and [Eio][8].
- Leverages the strong type system of Ocaml to create a type-safe API; making
it impossible to create, read or write malformed arrays.
Expand Down Expand Up @@ -132,3 +133,4 @@ FilesystemStore.erase_group_node store group_node;;
[6]: https://zarr-specs.readthedocs.io/en/latest/v3/core/v3.0.html
[7]: https://zoj613.github.io/zarr-ml/zarr/Zarr/index.html#examples
[8]: https://github.com/ocaml-multicore/eio
[9]: https://github.com/zoj613/zarr-ml/tree/main/examples/inmemory_zipstore.ml
Binary file added examples/data/testdata.zip
Binary file not shown.
13 changes: 13 additions & 0 deletions examples/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
(executable
(name readonly_zipstore)
(modules readonly_zipstore)
(ocamlopt_flags (:standard -O3))
(libraries zarr-eio camlzip))

(executable
(name inmemory_zipstore)
(modules inmemory_zipstore)
(ocamlopt_flags (:standard -O3))
(libraries zarr-lwt zipc)
(preprocess
(pps ppx_deriving.show)))
188 changes: 188 additions & 0 deletions examples/inmemory_zipstore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
(* This module implements a Zip file zarr store that is Lwt-aware.
It supports both read and write operations. This is because the
underlying Zip library used reads all Zip file bytes into memory. All
store updates are done in-memory and thus to update the actual zip file
we must persist the data using `MemoryZipStore.write_to_file`.
The main requirement is to implement the signature of Zarr.Types.IO.
We use Zarr_lwt Deferred module for `Deferred` so that the store can be
Lwt-aware.
To compile & run this example execute the command
dune exec -- examples/inmemory_zipstore.exe
in your shell at the root of this project. *)

module MemoryZipStore : sig
include Zarr.Storage.STORE with type 'a Deferred.t = 'a Lwt.t
(*val create : ?level:Zipc_deflate.level -> string -> t *)
val open_store : ?level:Zipc_deflate.level -> string -> t
val write_to_file : t -> unit Deferred.t
end = struct
module M = Map.Make(String)

module Z = struct
module Deferred = Zarr_lwt.Deferred
open Deferred.Infix

type t =
{mutable ic : Zipc.t
;mutex : Lwt_mutex.t
;level : Zipc_deflate.level
;path : string}

let is_member t key =
Deferred.return @@ Zipc.mem key t.ic

let size t key =
Deferred.return @@
match Zipc.find key t.ic with
| None -> 0
| Some m ->
match Zipc.Member.kind m with
| Zipc.Member.Dir -> 0
| Zipc.Member.File f -> Zipc.File.decompressed_size f

let get t key =
Deferred.return @@
match Zipc.find key t.ic with
| None -> raise (Zarr.Storage.Key_not_found key)
| Some m ->
match Zipc.Member.kind m with
| Zipc.Member.Dir -> failwith "cannot get size of directory."
| Zipc.Member.File f ->
match Zipc.File.to_binary_string f with
| Error e -> failwith e
| Ok s -> s

let get_partial_values t key ranges =
get t key >>= fun data ->
let size = String.length data in
ranges |> Lwt_list.map_p @@ fun (offset, len) ->
Deferred.return
(match len with
| None -> String.sub data offset (size - offset)
| Some l -> String.sub data offset l)

let list t =
Deferred.return @@ Zipc.fold
(fun m acc ->
match Zipc.Member.kind m with
| Zipc.Member.Dir -> acc
| Zipc.Member.File _ -> Zipc.Member.path m :: acc) t.ic []

let list_dir t prefix =
let module StrSet = Zarr.Util.StrSet in
let n = String.length prefix in
let m = Zipc.to_string_map t.ic in
let prefs, keys =
M.fold
(fun key v ((l, r) as acc) ->
match Zipc.Member.kind v with
| Zipc.Member.Dir -> acc
| Zipc.Member.File _ ->
let pred = String.starts_with ~prefix key in
match key with
| k when pred && String.contains_from k n '/' ->
StrSet.add String.(sub k 0 @@ 1 + index_from k n '/') l, r
| k when pred -> l, k :: r
| _ -> acc) m (StrSet.empty, [])
in Deferred.return (keys, StrSet.elements prefs)

let set t key value =
match Zipc.File.deflate_of_binary_string ~level:t.level value with
| Error e -> failwith e
| Ok f ->
match Zipc.Member.(make ~path:key @@ File f) with
| Error e -> failwith e
| Ok m ->
Lwt_mutex.with_lock
t.mutex
(fun () ->
t.ic <- Zipc.add m t.ic;
Deferred.return_unit)

let set_partial_values t key ?(append=false) rv =
let f =
if append then
fun acc (_, v) ->
Deferred.return @@ acc ^ v
else
fun acc (rs, v) ->
let s = Bytes.of_string acc in
String.(length v |> Bytes.blit_string v 0 s rs);
Deferred.return @@ Bytes.to_string s
in
match Zipc.Member.kind (Option.get @@ Zipc.find key t.ic) with
| Zipc.Member.Dir -> Deferred.return_unit
| Zipc.Member.File file ->
match Zipc.File.to_binary_string file with
| Error e -> failwith e
| Ok s -> Deferred.fold_left f s rv >>= set t key

let erase t key =
Lwt_mutex.with_lock
t.mutex
(fun () ->
t.ic <- Zipc.remove key t.ic;
Deferred.return_unit)

let erase_prefix t prefix =
let m = Zipc.to_string_map t.ic in
let m' =
M.filter_map
(fun k v ->
if String.starts_with ~prefix k then None else Some v) m in
Lwt_mutex.with_lock
t.mutex
(fun () ->
t.ic <- Zipc.of_string_map m';
Deferred.return_unit)
end

(* this functor generates the public signature of our Zip file store. *)
include Zarr.Storage.Make(Z)

(* now we create functions to open and close the store. *)

(*let create ?(level=`Default) path = Z.{ic = Zipc.empty; level; path} *)

let open_store ?(level=`Default) path =
match Zipc.of_binary_string In_channel.(with_open_bin path input_all) with
| Ok ic -> Z.{ic; level; path; mutex = Lwt_mutex.create ()}
| Error e -> failwith e

let write_to_file (t : Z.t) =
Lwt_io.with_file
~flags:Unix.[O_WRONLY; O_TRUNC; O_CREAT; O_NONBLOCK]
~mode:Lwt_io.Output
t.path
(fun oc ->
let open Lwt.Infix in
match Zipc.to_binary_string t.ic with
| Error e -> failwith e
| Ok s -> Lwt_io.write oc s >>= fun () -> Lwt_io.flush oc)
end

let _ =
Lwt_main.run @@ begin
let open Zarr.Node in
let open MemoryZipStore.Deferred.Infix in

let printlist = [%show: string list] in
let store = MemoryZipStore.open_store "examples/data/testdata.zip" in
MemoryZipStore.find_all_nodes store >>= fun (xs, _) ->
print_endline @@ "All array nodes: " ^ printlist (List.map ArrayNode.to_path xs);
let anode = List.hd @@ List.filter
(fun node -> ArrayNode.to_path node = "/some/group/name") xs in
let slice = Owl_types.[|R [0; 20]; I 10; R []|] in
MemoryZipStore.read_array store anode slice Bigarray.Char >>= fun x ->
print_string @@ "BEFORE: " ^ Owl_pretty.dsnda_to_string x;
let x' =
Owl.Dense.Ndarray.Generic.map
(fun _ -> Owl_stats_dist.uniform_int_rvs ~a:0 ~b:255 |> Char.chr) x in
MemoryZipStore.write_array store anode slice x' >>= fun () ->
MemoryZipStore.read_array store anode slice Bigarray.Char >>= fun y ->
print_string @@ "AFTER: " ^ Owl_pretty.dsnda_to_string y;
MemoryZipStore.write_to_file store >>| fun () ->
print_endline "Zip store has been update."
end
113 changes: 113 additions & 0 deletions examples/readonly_zipstore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
(* This module implements a Read-only Zip file zarr store that is Eio-aware.
The main requirement is to implement the signature of Zarr.Types.IO.
We use Zarr_eio's Deferred module for `Deferred` so that the store can be
Eio-aware. Since Zip stores cannot have files updated or removed, we only
implement the get_* and list_* family of functions and raise an
Not_implemented exception for the set_* and erase_* family of functions.
This effectively allows us to create a read-only store since calling any
of the following functions would result in an `Not_implemented` exception:
- ReadOnlyZipStore.create_group
- ReadOnlyZipStore.create_array
- ReadOnlyZipStore.erase_group_node
- ReadOnlyZipStore.erase_array_node
- ReadOnlyZipStore.erase_all_nodes
- ReadOnlyZipStore.write_array
- ReadOnlyZipStore.reshape
Below we show how to implement this custom Zarr Store.
To compile & run this example execute the command
dune exec -- examples/zipstore.exe
in your shell at the root of this project. *)

module ReadOnlyZipStore : sig
exception Not_implemented

include Zarr.Storage.STORE with type 'a Deferred.t = 'a
val open_store : string -> t
val close : t -> unit

end = struct
exception Not_implemented

module Z = struct
module Deferred = Zarr_eio.Deferred
open Deferred.Infix

type t = Zip.in_file

let is_member t key =
match Zip.find_entry t key with
| exception Not_found -> false
| _ -> true

let size t key =
match Zip.find_entry t key with
| e -> e.uncompressed_size
| exception Not_found -> 0

let get t key =
match Zip.find_entry t key with
| e -> Zip.read_entry t e
| exception Not_found -> raise (Zarr.Storage.Key_not_found key)

let get_partial_values t key ranges =
get t key >>= fun data ->
let size = String.length data in
ranges |> Eio.Fiber.List.map @@ fun (offset, len) ->
match len with
| None -> String.sub data offset (size - offset)
| Some l -> String.sub data offset l

let list t =
Zip.entries t |> Eio.Fiber.List.filter_map @@ function
| (e : Zip.entry) when not e.is_directory -> Some e.filename
| _ -> None

let list_dir t prefix =
let module StrSet = Zarr.Util.StrSet in
let n = String.length prefix in
let prefs, keys =
List.fold_left
(fun ((l, r) as acc) -> function
| (e : Zip.entry) when e.is_directory -> acc
| e when not @@ String.starts_with ~prefix e.filename -> acc
| e when String.contains_from e.filename n '/' ->
let key = e.filename in
let pre = String.sub key 0 @@ 1 + String.index_from key n '/' in
StrSet.add pre l, r
| e -> l, e.filename :: r) (StrSet.empty, []) @@ Zip.entries t
in keys, StrSet.elements prefs

let set _ = raise Not_implemented

let set_partial_values _ = raise Not_implemented

let erase _ = raise Not_implemented

let erase_prefix _ = raise Not_implemented
end

(* this functor generates the public signature of our Zip file store. *)
include Zarr.Storage.Make(Z)

(* now we create functions to open and close the store. *)
let open_store path = Zip.open_in path
let close = Zip.close_in
end

let _ =
Eio_main.run @@ fun _ ->
let open Zarr.Metadata in
let open Zarr.Node in

let store = ReadOnlyZipStore.open_store "examples/data/testdata.zip" in
let xs, _ = ReadOnlyZipStore.find_all_nodes store in
let anode = List.hd @@ Eio.Fiber.List.filter
(fun node -> ArrayNode.to_path node = "/some/group/name") xs in
let meta = ReadOnlyZipStore.array_metadata store anode in
let slice = Array.map (Fun.const @@ Owl_types.R []) (ArrayMetadata.shape meta) in
let arr = ReadOnlyZipStore.read_array store anode slice Bigarray.Char in
print_string @@ Owl_pretty.dsnda_to_string arr;
try ReadOnlyZipStore.write_array store anode slice arr with
| ReadOnlyZipStore.Not_implemented -> print_endline "Store is read-only";
ReadOnlyZipStore.close store

0 comments on commit 01e6935

Please sign in to comment.