Skip to content

Commit

Permalink
Partial encoding WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Sep 19, 2024
1 parent cfb65d4 commit c546dd8
Show file tree
Hide file tree
Showing 24 changed files with 1,135 additions and 51 deletions.
1 change: 1 addition & 0 deletions zarrs/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ pub fn chunk_shape_to_array_shape(chunk_shape: &[std::num::NonZeroU64]) -> Array
/// - [`ReadableWritableStorageTraits`](crate::storage::ReadableWritableStorageTraits): store operations requiring reading *and* writing
/// - [`store_chunk_subset`](Array::store_chunk_subset)
/// - [`store_array_subset`](Array::store_array_subset)
/// - [`partial_encoder`](Array::partial_encoder)
///
/// Many `retrieve` and `store` methods have multiple variants:
/// - Standard variants store or retrieve data represented as [`ArrayBytes`] (representing fixed or variable length bytes).
Expand Down
96 changes: 80 additions & 16 deletions zarrs/src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{array::ArrayBytes, array_subset::ArraySubset, storage::ReadableWritableStorageTraits};
use crate::{
array::ArrayBytes,
array_subset::ArraySubset,
storage::{ReadableWritableStorageTraits, StorageHandle},
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayPartialEncoderTraits, ArrayToBytesCodecTraits,
StoragePartialDecoder, StoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
Array, ArrayError, Element,
};

impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -180,21 +190,28 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
// let mutex = self.storage.mutex(&key)?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;
let partial_encoder = self.partial_encoder_opt(chunk_indices, options)?;
Ok(partial_encoder.partial_encode_opt(
&[chunk_subset],
vec![chunk_subset_bytes],
options,
)?)

// Update the chunk
let chunk_bytes_new = update_array_bytes(
chunk_bytes_old,
chunk_shape,
chunk_subset_bytes,
chunk_subset,
self.data_type().size(),
);
// // Decode the entire chunk
// let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
// chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;

// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// // Update the chunk
// let chunk_bytes_new = update_array_bytes(
// chunk_bytes_old,
// chunk_shape,
// chunk_subset_bytes,
// chunk_subset,
// self.data_type().size(),
// );

// // Store the updated chunk
// self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
}
}

Expand Down Expand Up @@ -350,4 +367,51 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
let subset_array = super::ndarray_into_vec(subset_array);
self.store_array_subset_elements_opt(&subset, &subset_array, options)
}

/// Initialises a partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub fn partial_encoder(
&self,
chunk_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
self.partial_encoder_opt(chunk_indices, &CodecOptions::default())
}

Check warning on line 380 in zarrs/src/array/array_sync_readable_writable.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/array_sync_readable_writable.rs#L375-L380

Added lines #L375 - L380 were not covered by tests

/// Explicit options version of [`partial_encoder`](Array::partial_encoder).
#[allow(clippy::missing_errors_doc)]
pub fn partial_encoder_opt(
&self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_readable_transformer(storage_handle.clone());
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer_read,
self.chunk_key(chunk_indices),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_writable_transformer(storage_handle);
let output_handle = Arc::new(StoragePartialEncoder::new(
storage_transformer_write,
self.chunk_key(chunk_indices),
));

Ok(self.codecs.clone().partial_encoder(
input_handle,
output_handle,
&chunk_representation,
options,
)?)
}
}
160 changes: 160 additions & 0 deletions zarrs/src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ pub use byte_interval_partial_decoder::ByteIntervalPartialDecoder;
#[cfg(feature = "async")]
pub use byte_interval_partial_decoder::AsyncByteIntervalPartialDecoder;

mod array_partial_encoder_default;
pub use array_partial_encoder_default::ArrayPartialEncoderDefault;

mod array_to_array_partial_encoder_default;
pub use array_to_array_partial_encoder_default::ArrayToArrayPartialEncoderDefault;

mod bytes_partial_encoder_default;
pub use bytes_partial_encoder_default::BytesPartialEncoderDefault;

use crate::storage::{StoreKeyStartValue, WritableStorage};
use crate::{
array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError},
byte_range::{extract_byte_ranges_read_seek, ByteRange, InvalidByteRangeError},
Expand Down Expand Up @@ -389,6 +399,60 @@ pub trait ArrayPartialDecoderTraits: Send + Sync {
}
}

/// Partial array encoder traits.
pub trait ArrayPartialEncoderTraits: Send + Sync {
/// Partially encode a chunk with default codec options.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
array_subsets: &[&ArraySubset],
array_subset_bytes: Vec<ArrayBytes<'_>>,
) -> Result<(), CodecError> {
self.partial_encode_opt(array_subsets, array_subset_bytes, &CodecOptions::default())
}

Check warning on line 414 in zarrs/src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/codec.rs#L408-L414

Added lines #L408 - L414 were not covered by tests

/// Explicit options version of [`partial_decode`](ArrayPartialEncoderTraits::partial_encode).
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
fn partial_encode_opt(
&self,
array_subsets: &[&ArraySubset],
array_subset_bytes: Vec<ArrayBytes<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
}

/// Partial bytes encoder traits.
pub trait BytesPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk with default codec options.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
) -> Result<(), CodecError> {
self.partial_encode_opt(byte_ranges, bytes, &CodecOptions::default())
}

/// Explicit options version of [`partial_encode`](BytesPartialEncoderTraits::partial_encode).
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
fn partial_encode_opt(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
}

#[cfg(feature = "async")]
/// Asynchronous partial array decoder traits.
#[async_trait::async_trait]
Expand Down Expand Up @@ -513,6 +577,62 @@ impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
}
}

/// A [`WritableStorage`] store value partial encoder.
pub struct StoragePartialEncoder {
storage: WritableStorage,
key: StoreKey,
}

impl StoragePartialEncoder {
/// Create a new storage partial encoder.
pub fn new(storage: WritableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

impl BytesPartialEncoderTraits for StoragePartialEncoder {
fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key)?)
}

fn partial_encode_opt(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
_options: &CodecOptions,
) -> Result<(), CodecError> {
if byte_ranges.len() != bytes.len() {
return Err(CodecError::Other(
"byte_ranges and bytes have a length mismatch".to_string(),
));

Check warning on line 607 in zarrs/src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/codec.rs#L605-L607

Added lines #L605 - L607 were not covered by tests
}

let mut key_start_values = Vec::with_capacity(bytes.len());
for (byte_range, bytes) in std::iter::zip(byte_ranges, &bytes) {
let byte_range_start = match byte_range {
ByteRange::FromEnd(_, _) => Err(CodecError::Other("BytesPartialEncoderTraits::partial_encode_opt does not support from end byte ranges".to_string())),
ByteRange::FromStart(start, None) => {
Ok(*start)

Check warning on line 615 in zarrs/src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/codec.rs#L613-L615

Added lines #L613 - L615 were not covered by tests
},
ByteRange::FromStart(start, Some(length)) => {
if bytes.len() as u64 == *length {
Ok(*start)
} else {
Err(CodecError::Other("BytesPartialEncoderTraits::partial_encode_opt incompatible byte range and bytes length".to_string()))

Check warning on line 621 in zarrs/src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/codec.rs#L621

Added line #L621 was not covered by tests
}
},
}?;

Check warning on line 624 in zarrs/src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

zarrs/src/array/codec.rs#L624

Added line #L624 was not covered by tests
key_start_values.push(StoreKeyStartValue::new(
self.key.clone(),
byte_range_start,
bytes,
));
}

Ok(self.storage.set_partial_values(&key_start_values)?)
}
}

/// Traits for array to array codecs.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
Expand Down Expand Up @@ -569,6 +689,18 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn ArrayPartialDecoderTraits>,
output_handle: Arc<dyn ArrayPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -580,6 +712,8 @@ pub trait ArrayToArrayCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: async_partial_encoder
}

/// Traits for array to bytes codecs.
Expand Down Expand Up @@ -666,6 +800,18 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &ChunkRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -677,6 +823,8 @@ pub trait ArrayToBytesCodecTraits: ArrayCodecTraits + core::fmt::Debug {
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits>, CodecError>;

// TODO: Async partial encoder
}

/// Traits for bytes to bytes codecs.
Expand Down Expand Up @@ -729,6 +877,18 @@ pub trait BytesToBytesCodecTraits: CodecTraits + core::fmt::Debug {
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits>, CodecError>;

/// Initialises a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder(
self: Arc<Self>,
input_handle: Arc<dyn BytesPartialDecoderTraits>,
output_handle: Arc<dyn BytesPartialEncoderTraits>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialEncoderTraits>, CodecError>;

#[cfg(feature = "async")]
/// Initialises an asynchronous partial decoder.
///
Expand Down
Loading

0 comments on commit c546dd8

Please sign in to comment.