Skip to content

Commit

Permalink
Enforce shard index codec chain invariant.
Browse files Browse the repository at this point in the history
The V3 spec insists that the codec chain for a shard index array
must not include compression codecs like Gzip since it would make
it impractical to calculate index bytes size. This commit
ensures this invariant is not violated by making a shard's index
codec chain correct by construction. That is, specifying a non-fixed
size bytes-to-bytes codec in the `index_codec` field is a compile time
error.
  • Loading branch information
zoj613 committed Jul 11, 2024
1 parent e2bf47c commit 8518c7b
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 57 deletions.
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ let array_node =
Result.get_ok @@ ArrayNode.(group_node / "name");;
let codec_chain =
{a2a = [Transpose [|2; 0; 1|]]
;a2b = Bytes Big
;b2b = [Gzip L2]};;
{a2a = [`Transpose [|2; 0; 1|]]
;a2b = `Bytes Big
;b2b = [`Gzip L2]};;
FilesystemStore.create_array
~codecs:codec_chain
Expand Down Expand Up @@ -81,16 +81,16 @@ R[73,1] -INF -INF -INF -INF -INF -INF *)
let config =
{chunk_shape = [|5; 3; 5|]
;codecs =
{a2a = [Transpose [|2; 0; 1|]]
;a2b = Bytes Little
;b2b = [Gzip L5]}
{a2a = [`Transpose [|2; 0; 1|]]
;a2b = `Bytes Little
;b2b = [`Gzip L5]}
;index_codecs =
{a2a = []
;a2b = Bytes Big
;b2b = [Crc32c]}
;a2b = `Bytes Big
;b2b = [`Crc32c]}
;index_location = Start};;
let codec_chain =
{a2a = []; a2b = ShardingIndexed config; b2b = []};;
{a2a = []; a2b = `ShardingIndexed config; b2b = []};;
let shard_node = Result.get_ok @@ ArrayNode.(group_node / "another");;
Expand Down
102 changes: 92 additions & 10 deletions lib/codecs/codecs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,123 @@ open Util.Result_syntax

module Ndarray = Owl.Dense.Ndarray.Generic

type arraytoarray =
[ `Transpose of int array ]

type fixed_bytestobytes =
[ `Crc32c ]

type variable_bytestobytes =
[ `Gzip of compression_level ]

type bytestobytes =
[ fixed_bytestobytes | variable_bytestobytes ]

type arraytobytes =
[ `Bytes of endianness
| `ShardingIndexed of sharding_config ]

and sharding_config =
{chunk_shape : int array
;codecs : bytestobytes shard_chain
;index_codecs : fixed_bytestobytes shard_chain
;index_location : loc}

and 'a shard_chain = {
a2a: arraytoarray list;
a2b: arraytobytes;
b2b: 'a list;
}

type codec_chain = {
a2a: arraytoarray list;
a2b: arraytobytes;
b2b: bytestobytes list;
}

let rec to_internal_a2b v =
match v with
| `Bytes e -> Bytes e
| `ShardingIndexed cfg ->
ShardingIndexed
{chunk_shape = cfg.chunk_shape
;index_location = cfg.index_location
;index_codecs : chain =
{a2a = to_internal_a2a cfg.index_codecs.a2a
;a2b = to_internal_a2b cfg.index_codecs.a2b
;b2b = fixed_to_internal_b2b cfg.index_codecs.b2b}
;codecs : chain =
{a2a = to_internal_a2a cfg.codecs.a2a
;a2b = to_internal_a2b cfg.codecs.a2b
;b2b = variable_to_internal_b2b cfg.codecs.b2b}}

and to_internal_a2a a2a =
List.fold_right
(fun x acc ->
match x with
| `Transpose o -> Transpose o :: acc) a2a []

and fixed_to_internal_b2b b2b =
List.fold_right
(fun x acc ->
match x with
| `Crc32c -> Crc32c :: acc) b2b []

and variable_to_internal_b2b b2b =
List.fold_right
(fun x acc ->
match x with
| `Gzip lvl -> Gzip lvl :: acc
| `Crc32c -> Crc32c :: acc) b2b []

module Chain = struct
type t = chain

let pp = pp_chain

let show = show_chain

let create repr {a2a; a2b; b2b} =
let create :
type a b. (a, b) Util.array_repr -> codec_chain -> (t, [> error ]) result
= fun repr cc ->
let a2a = to_internal_a2a cc.a2a in
let a2b = to_internal_a2b cc.a2b in
let b2b = variable_to_internal_b2b cc.b2b in
List.fold_left
(fun acc c ->
acc >>= fun r ->
ArrayToArray.parse r c >>= fun () ->
ArrayToArray.compute_encoded_representation c r) (Ok repr) a2a
>>= fun repr' ->
ArrayToBytes.parse repr' a2b >>| fun () ->
{a2a; a2b; b2b}
({a2a; a2b; b2b} : t)

let default =
let default : t =
{a2a = []; a2b = ArrayToBytes.default; b2b = []}

let compute_encoded_size input_size t =
let compute_encoded_size : int -> t -> int = fun input_size t ->
List.fold_left BytesToBytes.compute_encoded_size
(ArrayToBytes.compute_encoded_size
(List.fold_left ArrayToArray.compute_encoded_size
input_size t.a2a) t.a2b) t.b2b

let encode t x =
let encode :
type a b. t -> (a, b) Ndarray.t -> (string, [> error ]) result
= fun t x ->
List.fold_left
(fun acc c -> acc >>= ArrayToArray.encode c) (Ok x) t.a2a
>>= fun y ->
List.fold_left
(fun acc c -> acc >>= BytesToBytes.encode c)
(ArrayToBytes.encode y t.a2b) t.b2b

let decode t repr x =
let decode :
type a b.
t ->
(a, b) Util.array_repr ->
string ->
((a, b) Ndarray.t, [> error ]) result
= fun t repr x ->
(* compute the last encoded representation of array->array codec chain.
This becomes the decoded representation of the array->bytes decode
procedure. *)
Expand All @@ -55,16 +137,16 @@ module Chain = struct
(fun c acc -> acc >>= ArrayToArray.decode c)
t.a2a (ArrayToBytes.decode y repr' t.a2b)

let ( = ) x y =
let ( = ) : t -> t -> bool = fun x y ->
x.a2a = y.a2a && x.a2b = y.a2b && x.b2b = y.b2b

let to_yojson t =
let to_yojson : t -> Yojson.Safe.t = fun t ->
`List
(List.map ArrayToArray.to_yojson t.a2a @
(ArrayToBytes.to_yojson t.a2b) ::
List.map BytesToBytes.to_yojson t.b2b)

let of_yojson x =
let of_yojson : Yojson.Safe.t -> (t, string) result = fun x ->
let filter_partition f encoded =
List.fold_right (fun c (l, r) ->
match f c with
Expand All @@ -82,7 +164,7 @@ module Chain = struct
let a2a, rest = filter_partition ArrayToArray.of_yojson rest in
let b2b, rest = filter_partition BytesToBytes.of_yojson rest in
match rest with
| [] -> Ok {a2a; a2b; b2b}
| [] -> Ok ({a2a; a2b; b2b} : t)
| x :: _ ->
let msg =
(Util.get_name x) ^
Expand Down
43 changes: 25 additions & 18 deletions lib/codecs/codecs.mli
Original file line number Diff line number Diff line change
@@ -1,35 +1,42 @@
module Ndarray = Owl.Dense.Ndarray.Generic

type dimension_order = int array

type array_to_array =
| Transpose of dimension_order
type arraytoarray =
[ `Transpose of int array ]

type compression_level =
| L0 | L1 | L2 | L3 | L4 | L5 | L6 | L7 | L8 | L9

type bytes_to_bytes =
| Crc32c
| Gzip of compression_level
type fixed_bytestobytes =
[ `Crc32c ]
type variable_bytestobytes =
[ `Gzip of compression_level ]
type bytestobytes =
[ fixed_bytestobytes | variable_bytestobytes ]

type endianness = Little | Big

type loc = Start | End

type array_to_bytes =
| Bytes of endianness
| ShardingIndexed of shard_config
type arraytobytes =
[ `Bytes of endianness
| `ShardingIndexed of sharding_config ]

and shard_config =
and sharding_config =
{chunk_shape : int array
;codecs : chain
;index_codecs : chain
;codecs : bytestobytes shard_chain
;index_codecs : fixed_bytestobytes shard_chain
;index_location : loc}

and chain = {
a2a: array_to_array list;
a2b: array_to_bytes;
b2b: bytes_to_bytes list;
and 'a shard_chain = {
a2a: arraytoarray list;
a2b: arraytobytes;
b2b: 'a list;
}

type codec_chain = {
a2a: arraytoarray list;
a2b: arraytobytes;
b2b: bytestobytes list;
}

type error = Array_to_bytes.error
Expand All @@ -38,7 +45,7 @@ module Chain : sig
type t

val create
: ('a, 'b) Util.array_repr -> chain -> (t, [> error]) result
: ('a, 'b) Util.array_repr -> codec_chain -> (t, [> error]) result

val default : t

Expand Down
2 changes: 1 addition & 1 deletion lib/storage/storage_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ module type S = sig
: ?sep:[< `Dot | `Slash > `Slash ] ->
?dimension_names:string option list ->
?attributes:Yojson.Safe.t ->
?codecs:Codecs.chain ->
?codecs:Codecs.codec_chain ->
shape:int array ->
chunks:int array ->
('a, 'b) Bigarray.kind ->
Expand Down
36 changes: 18 additions & 18 deletions test/test_codecs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ let bytes_encode_decode
assert_equal
~printer:Owl_pretty.dsnda_to_string
arr
(Result.get_ok decoded)) [Bytes Little; Bytes Big]
(Result.get_ok decoded)) [`Bytes Little; `Bytes Big]

let tests = [
"test codec chain" >:: (fun _ ->
Expand All @@ -48,20 +48,20 @@ let tests = [
let shard_cfg =
{chunk_shape = [|2; 5; 5|]
;index_location = End
;index_codecs = {a2a = []; a2b = Bytes Little; b2b = [Crc32c]}
;codecs = {a2a = [Transpose [|0; 1; 2|]]; a2b = Bytes Big; b2b = [Gzip L1]}}
;index_codecs = {a2a = []; a2b = `Bytes Little; b2b = [`Crc32c]}
;codecs = {a2a = [`Transpose [|0; 1; 2|]]; a2b = `Bytes Big; b2b = [`Gzip L1]}}
in
let chain =
{a2a = [Transpose [|2; 1; 0; 3|]]
;a2b = ShardingIndexed shard_cfg
;b2b = [Crc32c; Gzip L9]}
{a2a = [`Transpose [|2; 1; 0; 3|]]
;a2b = `ShardingIndexed shard_cfg
;b2b = [`Crc32c; `Gzip L9]}
in
assert_bool
"" @@
Result.is_error @@
Chain.create decoded_repr chain;

let chain = {chain with a2a = [Transpose [|2; 1; 0|]]} in
let chain = {chain with a2a = [`Transpose [|2; 1; 0|]]} in
let c = Chain.create decoded_repr chain in
assert_bool "" @@ Result.is_ok c;
let c = Result.get_ok c in
Expand All @@ -72,7 +72,7 @@ let tests = [

let c' =
Result.get_ok @@
Chain.create decoded_repr {chain with b2b = [Crc32c]}
Chain.create decoded_repr {chain with b2b = [`Crc32c]}
in
let init_size =
(Array.fold_left Int.mul 1 decoded_repr.shape) *
Expand Down Expand Up @@ -197,7 +197,7 @@ let tests = [
;fill_value = Complex.zero}
in
let chain =
{a2a = [Transpose [||]]; a2b = Bytes Little; b2b = []}
{a2a = [`Transpose [||]]; a2b = `Bytes Little; b2b = []}
in
assert_bool
"" @@
Expand All @@ -207,7 +207,7 @@ let tests = [
"" @@
Result.is_error @@
Chain.create decoded_repr
{chain with a2a = [Transpose [|4; 0; 1|]]})
{chain with a2a = [`Transpose [|4; 0; 1|]]})
;

"test sharding indexed codec" >:: (fun _ ->
Expand Down Expand Up @@ -324,11 +324,11 @@ let tests = [
let cfg =
{chunk_shape = [|3; 5; 5|]
;index_location = Start
;index_codecs = {a2a = []; a2b = Bytes Little; b2b = []}
;codecs = {a2a = []; a2b = Bytes Big; b2b = []}}
;index_codecs = {a2a = []; a2b = `Bytes Little; b2b = []}
;codecs = {a2a = []; a2b = `Bytes Big; b2b = []}}
in
let chain =
{a2a = []; a2b = ShardingIndexed cfg; b2b = []} in
{a2a = []; a2b = `ShardingIndexed cfg; b2b = []} in
(*test failure for chunk shape not evenly dividing shard. *)
assert_bool
"chunk shape must always evenly divide a shard" @@
Expand All @@ -337,11 +337,11 @@ let tests = [
assert_bool
"chunk shape must have same size as shard dimensionality" @@
Result.is_error @@ Chain.create decoded_repr @@
{a2a = []; a2b = ShardingIndexed {cfg with chunk_shape = [|5|]}; b2b = []};
{a2a = []; a2b = `ShardingIndexed {cfg with chunk_shape = [|5|]}; b2b = []};

let chain =
{a2a = []
;a2b = ShardingIndexed {cfg with chunk_shape = [|5; 5; 5|]}
;a2b = `ShardingIndexed {cfg with chunk_shape = [|5; 5; 5|]}
;b2b = []}
in
let c = Chain.create decoded_repr chain in
Expand Down Expand Up @@ -432,14 +432,14 @@ let tests = [
decoded_repr.shape
decoded_repr.fill_value
in
let chain = {a2a = []; a2b = Bytes Little; b2b = []}
let chain = {a2a = []; a2b = `Bytes Little; b2b = []}
in
List.iter
(fun level ->
let c =
Chain.create decoded_repr {chain with b2b = [Gzip level]} in
Chain.create decoded_repr {chain with b2b = [`Gzip level]} in
assert_bool
"Creating Gzip chain should not fail." @@
"Creating `Gzip chain should not fail." @@
Result.is_ok c;
let c = Result.get_ok c in
let enc = Chain.encode c arr in
Expand Down
2 changes: 1 addition & 1 deletion test/test_storage.ml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ let test_store
let r =
M.create_array
~sep:`Dot
~codecs:{a2a = []; a2b = Bytes Big; b2b = []}
~codecs:{a2a = []; a2b = `Bytes Big; b2b = []}
~shape:[|100; 100; 50|]
~chunks:[|10; 15; 20|]
Bigarray.Complex64
Expand Down

0 comments on commit 8518c7b

Please sign in to comment.