Skip to content

Commit

Permalink
Partial encoding WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Aug 14, 2024
1 parent 5473c8a commit 53e6cd3
Show file tree
Hide file tree
Showing 26 changed files with 1,272 additions and 207 deletions.
1 change: 1 addition & 0 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ pub struct NonZeroError;
/// - [`ReadableWritableStorageTraits`](crate::storage::ReadableWritableStorageTraits): store operations requiring reading *and* writing
/// - [`store_chunk_subset`](Array::store_chunk_subset)
/// - [`store_array_subset`](Array::store_array_subset)
/// - [`partial_encoder`](Array::partial_encoder)
///
/// Many `retrieve` and `store` methods have multiple variants:
/// - Standard variants store or retrieve data represented as [`ArrayBytes`] (representing fixed or variable length bytes).
Expand Down
96 changes: 80 additions & 16 deletions src/array/array_sync_readable_writable.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use std::sync::Arc;

use rayon::iter::{IntoParallelIterator, ParallelIterator};

use crate::{array::ArrayBytes, array_subset::ArraySubset, storage::ReadableWritableStorageTraits};
use crate::{
array::ArrayBytes,
array_subset::ArraySubset,
storage::{data_key, ReadableWritableStorageTraits, StorageHandle},
};

use super::{
array_bytes::update_array_bytes, codec::options::CodecOptions,
concurrency::concurrency_chunks_and_codec, Array, ArrayError, Element,
codec::{
options::CodecOptions, ArrayPartialEncoderTraits, ArrayToBytesCodecTraits,
StoragePartialDecoder, StoragePartialEncoder,
},
concurrency::concurrency_chunks_and_codec,
Array, ArrayError, Element,
};

impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage> {
Expand Down Expand Up @@ -180,21 +190,28 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
// let mutex = self.storage.mutex(&key)?;
// let _lock = mutex.lock();

// Decode the entire chunk
let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;
let partial_encoder = self.partial_encoder_opt(chunk_indices, options)?;
Ok(partial_encoder.partial_encode_opt(
&[chunk_subset.clone()],
vec![chunk_subset_bytes],
options,
)?)

// Update the chunk
let chunk_bytes_new = update_array_bytes(
chunk_bytes_old,
chunk_shape,
chunk_subset_bytes,
chunk_subset,
self.data_type().size(),
);
// // Decode the entire chunk
// let chunk_bytes_old = self.retrieve_chunk_opt(chunk_indices, options)?;
// chunk_bytes_old.validate(chunk_shape.iter().product(), self.data_type().size())?;

// Store the updated chunk
self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
// // Update the chunk
// let chunk_bytes_new = update_array_bytes(
// chunk_bytes_old,
// chunk_shape,
// chunk_subset_bytes,
// chunk_subset,
// self.data_type().size(),
// );

// // Store the updated chunk
// self.store_chunk_opt(chunk_indices, chunk_bytes_new, options)
}
}

Expand Down Expand Up @@ -350,4 +367,51 @@ impl<TStorage: ?Sized + ReadableWritableStorageTraits + 'static> Array<TStorage>
let subset_array = super::ndarray_into_vec(subset_array);
self.store_array_subset_elements_opt(&subset, &subset_array, options)
}

/// Initialises a partial encoder for the chunk at `chunk_indices`.
///
/// # Errors
/// Returns an [`ArrayError`] if initialisation of the partial encoder fails.
pub fn partial_encoder<'a>(
&'a self,
chunk_indices: &[u64],
) -> Result<Arc<dyn ArrayPartialEncoderTraits + 'a>, ArrayError> {
self.partial_encoder_opt(chunk_indices, &CodecOptions::default())
}

Check warning on line 380 in src/array/array_sync_readable_writable.rs

View check run for this annotation

Codecov / codecov/patch

src/array/array_sync_readable_writable.rs#L375-L380

Added lines #L375 - L380 were not covered by tests

/// Explicit options version of [`partial_encoder`](Array::partial_encoder).
#[allow(clippy::missing_errors_doc)]
pub fn partial_encoder_opt<'a>(
&'a self,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits + 'a>, ArrayError> {
let storage_handle = Arc::new(StorageHandle::new(self.storage.clone()));

// Input
let storage_transformer_read = self
.storage_transformers()
.create_readable_transformer(storage_handle.clone());
let input_handle = Arc::new(StoragePartialDecoder::new(
storage_transformer_read,
data_key(self.path(), chunk_indices, self.chunk_key_encoding()),
));
let chunk_representation = self.chunk_array_representation(chunk_indices)?;

// Output
let storage_transformer_write = self
.storage_transformers()
.create_writable_transformer(storage_handle);
let output_handle = Arc::new(StoragePartialEncoder::new(
storage_transformer_write,
data_key(self.path(), chunk_indices, self.chunk_key_encoding()),
));

Ok(self.codecs().partial_encoder(
input_handle,
output_handle,
&chunk_representation,
options,
)?)
}
}
160 changes: 160 additions & 0 deletions src/array/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ pub use byte_interval_partial_decoder::ByteIntervalPartialDecoder;
#[cfg(feature = "async")]
pub use byte_interval_partial_decoder::AsyncByteIntervalPartialDecoder;

mod array_partial_encoder_default;
pub use array_partial_encoder_default::ArrayPartialEncoderDefault;

mod array_to_array_partial_encoder_default;
pub use array_to_array_partial_encoder_default::ArrayToArrayPartialEncoderDefault;

mod bytes_partial_encoder_default;
pub use bytes_partial_encoder_default::BytesPartialEncoderDefault;

use crate::storage::{StoreKeyStartValue, WritableStorage};
use crate::{
array_subset::{ArraySubset, IncompatibleArraySubsetAndShapeError},
byte_range::{ByteOffset, ByteRange, InvalidByteRangeError},
Expand Down Expand Up @@ -353,6 +363,60 @@ pub trait ArrayPartialDecoderTraits: Send + Sync {
) -> Result<Vec<ArrayBytes<'_>>, CodecError>;
}

/// Partial array encoder traits.
pub trait ArrayPartialEncoderTraits: Send + Sync {
/// Partially encode a chunk with default codec options.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
array_subsets: &[ArraySubset],
array_subset_bytes: Vec<ArrayBytes<'_>>,
) -> Result<(), CodecError> {
self.partial_encode_opt(array_subsets, array_subset_bytes, &CodecOptions::default())
}

Check warning on line 378 in src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

src/array/codec.rs#L372-L378

Added lines #L372 - L378 were not covered by tests

/// Explicit options version of [`partial_decode`](ArrayPartialEncoderTraits::partial_encode).
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
fn partial_encode_opt(
&self,
array_subsets: &[ArraySubset],
array_subset_bytes: Vec<ArrayBytes<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
}

/// Partial bytes encoder traits.
pub trait BytesPartialEncoderTraits: Send + Sync {
/// Erase the chunk.
///
/// # Errors
/// Returns an error if there is an underlying store error.
fn erase(&self) -> Result<(), CodecError>;

/// Partially encode a chunk with default codec options.
///
/// # Errors
/// Returns [`CodecError`] if a codec fails or an array subset is invalid.
fn partial_encode(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
) -> Result<(), CodecError> {
self.partial_encode_opt(byte_ranges, bytes, &CodecOptions::default())
}

/// Explicit options version of [`partial_encode`](BytesPartialEncoderTraits::partial_encode).
#[allow(clippy::missing_errors_doc, clippy::missing_panics_doc)]
fn partial_encode_opt(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
options: &CodecOptions,
) -> Result<(), CodecError>;
}

#[cfg(feature = "async")]
/// Asynchronous partial array decoder traits.
#[async_trait::async_trait]
Expand Down Expand Up @@ -447,6 +511,62 @@ impl AsyncBytesPartialDecoderTraits for AsyncStoragePartialDecoder {
}
}

/// A [`WritableStorage`] store value partial encoder.
pub struct StoragePartialEncoder {
storage: WritableStorage,
key: StoreKey,
}

impl StoragePartialEncoder {
/// Create a new storage partial encoder.
pub fn new(storage: WritableStorage, key: StoreKey) -> Self {
Self { storage, key }
}
}

impl BytesPartialEncoderTraits for StoragePartialEncoder {
fn erase(&self) -> Result<(), CodecError> {
Ok(self.storage.erase(&self.key)?)
}

fn partial_encode_opt(
&self,
byte_ranges: &[ByteRange],
bytes: Vec<RawBytes<'_>>,
_options: &CodecOptions,
) -> Result<(), CodecError> {
if byte_ranges.len() != bytes.len() {
return Err(CodecError::Other(
"byte_ranges and bytes have a length mismatch".to_string(),
));

Check warning on line 541 in src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

src/array/codec.rs#L539-L541

Added lines #L539 - L541 were not covered by tests
}

let mut key_start_values = Vec::with_capacity(bytes.len());
for (byte_range, bytes) in std::iter::zip(byte_ranges, &bytes) {
let byte_range_start = match byte_range {
ByteRange::FromEnd(_, _) => Err(CodecError::Other("BytesPartialEncoderTraits::partial_encode_opt does not support from end byte ranges".to_string())),
ByteRange::FromStart(start, None) => {
Ok(*start)

Check warning on line 549 in src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

src/array/codec.rs#L547-L549

Added lines #L547 - L549 were not covered by tests
},
ByteRange::FromStart(start, Some(length)) => {
if bytes.len() as u64 == *length {
Ok(*start)
} else {
Err(CodecError::Other("BytesPartialEncoderTraits::partial_encode_opt incompatible byte range and bytes length".to_string()))

Check warning on line 555 in src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

src/array/codec.rs#L555

Added line #L555 was not covered by tests
}
},
}?;

Check warning on line 558 in src/array/codec.rs

View check run for this annotation

Codecov / codecov/patch

src/array/codec.rs#L558

Added line #L558 was not covered by tests
key_start_values.push(StoreKeyStartValue::new(
self.key.clone(),
byte_range_start,
bytes,
));
}

Ok(self.storage.set_partial_values(&key_start_values)?)
}
}

/// Traits for array to array codecs.
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait ArrayToArrayCodecTraits:
Expand Down Expand Up @@ -495,6 +615,18 @@ pub trait ArrayToArrayCodecTraits:
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits + 'a>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder<'a>(
&'a self,
input_handle: Arc<dyn ArrayPartialDecoderTraits + 'a>,
output_handle: Arc<dyn ArrayPartialEncoderTraits + 'a>,
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits + 'a>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -506,6 +638,8 @@ pub trait ArrayToArrayCodecTraits:
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits + 'a>, CodecError>;

// TODO: async_partial_encoder
}

dyn_clone::clone_trait_object!(ArrayToArrayCodecTraits);
Expand Down Expand Up @@ -557,6 +691,18 @@ pub trait ArrayToBytesCodecTraits:
options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialDecoderTraits + 'a>, CodecError>;

/// Initialise a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder<'a>(
&'a self,
input_handle: Arc<dyn BytesPartialDecoderTraits + 'a>,
output_handle: Arc<dyn BytesPartialEncoderTraits + 'a>,
decoded_representation: &ChunkRepresentation,
_options: &CodecOptions,
) -> Result<Arc<dyn ArrayPartialEncoderTraits + 'a>, CodecError>;

#[cfg(feature = "async")]
/// Initialise an asynchronous partial decoder.
///
Expand All @@ -568,6 +714,8 @@ pub trait ArrayToBytesCodecTraits:
decoded_representation: &ChunkRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn AsyncArrayPartialDecoderTraits + 'a>, CodecError>;

// TODO: Async partial encoder
}

dyn_clone::clone_trait_object!(ArrayToBytesCodecTraits);
Expand Down Expand Up @@ -622,6 +770,18 @@ pub trait BytesToBytesCodecTraits: CodecTraits + dyn_clone::DynClone + core::fmt
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialDecoderTraits + 'a>, CodecError>;

/// Initialises a partial encoder.
///
/// # Errors
/// Returns a [`CodecError`] if initialisation fails.
fn partial_encoder<'a>(
&'a self,
input_handle: Arc<dyn BytesPartialDecoderTraits + 'a>,
output_handle: Arc<dyn BytesPartialEncoderTraits + 'a>,
decoded_representation: &BytesRepresentation,
options: &CodecOptions,
) -> Result<Arc<dyn BytesPartialEncoderTraits + 'a>, CodecError>;

#[cfg(feature = "async")]
/// Initialises an asynchronous partial decoder.
///
Expand Down
Loading

0 comments on commit 53e6cd3

Please sign in to comment.