Skip to content

Commit

Permalink
Add encoded and thread local chunk cache support (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin authored Aug 26, 2024
1 parent 0c08cae commit 63cefef
Show file tree
Hide file tree
Showing 9 changed files with 1,107 additions and 705 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Add `ChunkGridTraits::chunks_in_array_subset()`
- Add `ArrayChunkCacheExt`, `ChunkCache`, `ChunkCacheLru{Size,Chunk}Limit`
- Add chunk cache support
- Add `ArrayChunkCacheExt` extension trait for `Array`
- Add traits: `ChunkCache`, `ChunkCacheType` (implemented by `ChunkCacheType{Encoded,Decoded}`)
- Add chunk cache implementations: `ChunkCache{En,De}codedLru{Size,Chunk}Limit[ThreadLocal]`

### Changed
- **Breaking**: `Arc` instead of `Box` partial decoders
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ gdeflate-sys = { version = "0.4.1", optional = true }
half = { version = "2.0.0", features = ["bytemuck"] }
inventory = "0.3.0"
itertools = "0.13.0"
lru = "0.12.4"
moka = { version = "0.12.8", features = ["sync"] }
monostate = "0.1.0"
ndarray = { version = ">=0.15.0,<17", optional = true }
Expand All @@ -72,6 +73,7 @@ serde = { version = "1.0.184", features = ["derive"] }
serde_json = { version = "1.0.71", features = ["float_roundtrip", "preserve_order"] }
serde_repr = "0.1.19"
thiserror = "1.0.61"
thread_local = "1.1.8"
url = { version = "2.2.0", optional = true }
walkdir = "2.3.2"
zfp-sys = {version = "0.1.15", features = ["static"], optional = true }
Expand Down
34 changes: 22 additions & 12 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ pub use crate::metadata::v2::ArrayMetadataV2;
pub use crate::metadata::v3::{fill_value::FillValueMetadata, ArrayMetadataV3};
pub use crate::metadata::ArrayMetadata;

pub use chunk_cache::array_chunk_cache_sync_readable_ext::ArrayChunkCacheExt;
pub use chunk_cache::array_chunk_cache_ext_sync::ArrayChunkCacheExt;
pub use chunk_cache::{
chunk_cache_lru_chunk_limit::ChunkCacheLruChunkLimit,
chunk_cache_lru_size_limit::ChunkCacheLruSizeLimit, ChunkCache,
chunk_cache_lru::*, ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded,
};

#[cfg(feature = "sharding")]
Expand Down Expand Up @@ -210,25 +209,36 @@ pub struct NonZeroError;
/// Another alternative is to use [Chunk Caching](#chunk-caching).
///
/// ### Chunk Caching
/// The [`ArrayChunkCacheExt`] trait adds [`Array`] retrieve methods that cache decoded chunks:
/// The [`ArrayChunkCacheExt`] trait adds [`Array`] retrieve methods that utilise chunk caching:
/// - [`retrieve_chunk_opt_cached`](ArrayChunkCacheExt::retrieve_chunk_opt_cached)
/// - [`retrieve_chunks_opt_cached`](ArrayChunkCacheExt::retrieve_chunks_opt_cached)
/// - [`retrieve_chunk_subset_opt_cached`](ArrayChunkCacheExt::retrieve_chunk_subset_opt_cached)
/// - [`retrieve_array_subset_opt_cached`](ArrayChunkCacheExt::retrieve_array_subset_opt_cached)
///
/// `_elements` and `_ndarray` variants are also available.
/// Each method has a `cache` parameter that implements the [`ChunkCache`] trait.
/// Cached retrieve methods do not use partial decoders, so any intersected chunk is fully decoded if not present in the cache.
/// This can reduce performance for some access patterns.
/// Prefer not to use a chunk cache if chunks are not accessed repeatedly.
///
/// Two chunk caches are provided by `zarrs`:
/// - [`ChunkCacheLruChunkLimit`]: an LRU (least recently used) cache with a fixed chunk capacity.
/// - [`ChunkCacheLruSizeLimit`]: an LRU cache with a fixed size in bytes.
/// Several Least Recently Used (LRU) chunk caches are provided by `zarrs`:
/// - [`ChunkCacheDecodedLruChunkLimit`]: a decoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheEncodedLruChunkLimit`]: an encoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheDecodedLruSizeLimit`]: a decoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheEncodedLruSizeLimit`]: an encoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheDecodedLruChunkLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheEncodedLruChunkLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheDecodedLruSizeLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed size in bytes (per thread).
/// - [`ChunkCacheEncodedLruSizeLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed size in bytes (per thread).
///
/// These caches use internal locking to support multithreading, which has a performance overhead.
/// `zarrs` consumers can create custom caches by implementing the [`ChunkCache`] trait.
/// For example, consider a custom lock-free per-thread cache, or an alternative to LRU.
///
/// Chunk caching is likely to be effective for remote stores where redundant retrieval are costly.
/// Chunk caching may not outperform disk caching with a filesystem store.
/// The above caches use internal locking to support multithreading, which has a performance overhead.
/// **Prefer not to use a chunk cache if chunks are not accessed repeatedly**, because cached retrieve methods do not use partial decoders and any intersected chunk is fully decoded if not present in the cache.
/// The encoded chunk caches may be optimal if dealing with highly compressed/sparse data with a fast codec.
/// However, the decoded chunk caches are likely to be more performant in most cases.
///
/// For many access patterns, chunk caching may reduce performance.
/// **Benchmark your algorithm/data.**
///
/// ### Reading Sharded Arrays
/// The `sharding_indexed` ([`ShardingCodec`](codec::array_to_bytes::sharding)) codec enables multiple sub-chunks ("inner chunks") to be stored in a single chunk ("shard").
Expand Down
80 changes: 67 additions & 13 deletions src/array/chunk_cache.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,94 @@
use std::sync::Arc;

use super::{ArrayBytes, ArrayError};
use crate::storage::ReadableStorageTraits;

pub mod array_chunk_cache_sync_readable_ext;
pub mod chunk_cache_lru_chunk_limit;
pub mod chunk_cache_lru_size_limit;
use super::{codec::CodecOptions, Array, ArrayBytes, ArrayError, RawBytes};

// pub mod array_chunk_cache_ext_decoded_sync;
// pub mod array_chunk_cache_ext_encoded_sync;
pub mod array_chunk_cache_ext_sync;
pub mod chunk_cache_lru;
// pub mod chunk_cache_lru_chunk_limit_thread_local;
// pub mod chunk_cache_lru_size_limit_thread_local;

/// The chunk type of an encoded chunk cache.
pub type ChunkCacheTypeEncoded = Option<RawBytes<'static>>;

/// The chunk type of a decoded chunk cache.
pub type ChunkCacheTypeDecoded = ArrayBytes<'static>;

/// A chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]).
pub trait ChunkCacheType: Send + Sync + 'static {
/// The size of the chunk in bytes.
fn size(&self) -> usize;
}

impl ChunkCacheType for ChunkCacheTypeEncoded {
fn size(&self) -> usize {
self.as_ref().map_or(0, |v| v.len())
}
}

impl ChunkCacheType for ChunkCacheTypeDecoded {
fn size(&self) -> usize {
ArrayBytes::size(self)
}
}

/// Traits for a chunk cache.
pub trait ChunkCache: Send + Sync {
pub trait ChunkCache<CT: ChunkCacheType>: Send + Sync {
/// Retrieve and decode a chunk.
///
/// # Errors
/// Returns an [`ArrayError`] if the underlying array retrieval method fails.
fn retrieve_chunk<TStorage: ?Sized + ReadableStorageTraits + 'static>(
&self,
array: &Array<TStorage>,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<ArrayBytes<'static>>, ArrayError>;

/// Retrieve a chunk from the cache. Returns [`None`] if the chunk is not present.
///
/// The chunk cache implementation may modify the cache (e.g. update LRU cache) on retrieval.
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<ArrayBytes<'static>>>;
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<CT>>;

/// Insert a chunk into the cache.
fn insert(&self, chunk_indices: Vec<u64>, chunk: Arc<ArrayBytes<'static>>);
fn insert(&self, chunk_indices: Vec<u64>, chunk: Arc<CT>);

/// Get or insert a chunk in the cache.
///
/// Override the default implementation if a chunk offers a more performant implementation.
///
/// # Errors
/// Returns an error if `f` returns an error.
fn try_get_or_insert_with<F, E>(
&self,
key: Vec<u64>,
chunk_indices: Vec<u64>,
f: F,
) -> Result<Arc<ArrayBytes<'static>>, Arc<ArrayError>>
) -> Result<Arc<CT>, Arc<ArrayError>>
where
F: FnOnce() -> Result<Arc<ArrayBytes<'static>>, ArrayError>;
F: FnOnce() -> Result<Arc<CT>, ArrayError>,
{
let chunk_indices = chunk_indices.clone();
if let Some(chunk) = self.get(&chunk_indices) {
Ok(chunk)
} else {
let chunk = f()?;
self.insert(chunk_indices, chunk.clone());
Ok(chunk)
}
}

/// Return the number of chunks in the cache.
/// Return the number of chunks in the cache. For a thread-local cache, returns the number of chunks cached on the current thread.
#[must_use]
fn len(&self) -> usize;

/// Returns true if the cache is empty.
/// Returns true if the cache is empty. For a thread-local cache, returns if the cache is empty on the current thread.
#[must_use]
fn is_empty(&self) -> bool;
fn is_empty(&self) -> bool {
self.len() == 0
}
}

// TODO: AsyncChunkCache
Loading

0 comments on commit 63cefef

Please sign in to comment.