Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add encoded and thread local chunk cache support #57

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Add `ChunkGridTraits::chunks_in_array_subset()`
- Add `ArrayChunkCacheExt`, `ChunkCache`, `ChunkCacheLru{Size,Chunk}Limit`
- Add chunk cache support
- Add `ArrayChunkCacheExt` extension trait for `Array`
- Add traits: `ChunkCache`, `ChunkCacheType` (implemented by `ChunkCacheType{Encoded,Decoded}`)
- Add chunk cache implementations: `ChunkCache{En,De}codedLru{Size,Chunk}Limit[ThreadLocal]`

### Changed
- **Breaking**: `Arc` instead of `Box` partial decoders
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ gdeflate-sys = { version = "0.4.1", optional = true }
half = { version = "2.0.0", features = ["bytemuck"] }
inventory = "0.3.0"
itertools = "0.13.0"
lru = "0.12.4"
moka = { version = "0.12.8", features = ["sync"] }
monostate = "0.1.0"
ndarray = { version = ">=0.15.0,<17", optional = true }
Expand All @@ -72,6 +73,7 @@ serde = { version = "1.0.184", features = ["derive"] }
serde_json = { version = "1.0.71", features = ["float_roundtrip", "preserve_order"] }
serde_repr = "0.1.19"
thiserror = "1.0.61"
thread_local = "1.1.8"
url = { version = "2.2.0", optional = true }
walkdir = "2.3.2"
zfp-sys = {version = "0.1.15", features = ["static"], optional = true }
Expand Down
34 changes: 22 additions & 12 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,9 @@ pub use crate::metadata::v2::ArrayMetadataV2;
pub use crate::metadata::v3::{fill_value::FillValueMetadata, ArrayMetadataV3};
pub use crate::metadata::ArrayMetadata;

pub use chunk_cache::array_chunk_cache_sync_readable_ext::ArrayChunkCacheExt;
pub use chunk_cache::array_chunk_cache_ext_sync::ArrayChunkCacheExt;
pub use chunk_cache::{
chunk_cache_lru_chunk_limit::ChunkCacheLruChunkLimit,
chunk_cache_lru_size_limit::ChunkCacheLruSizeLimit, ChunkCache,
chunk_cache_lru::*, ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded,
};

#[cfg(feature = "sharding")]
Expand Down Expand Up @@ -210,25 +209,36 @@ pub struct NonZeroError;
/// Another alternative is to use [Chunk Caching](#chunk-caching).
///
/// ### Chunk Caching
/// The [`ArrayChunkCacheExt`] trait adds [`Array`] retrieve methods that cache decoded chunks:
/// The [`ArrayChunkCacheExt`] trait adds [`Array`] retrieve methods that utilise chunk caching:
/// - [`retrieve_chunk_opt_cached`](ArrayChunkCacheExt::retrieve_chunk_opt_cached)
/// - [`retrieve_chunks_opt_cached`](ArrayChunkCacheExt::retrieve_chunks_opt_cached)
/// - [`retrieve_chunk_subset_opt_cached`](ArrayChunkCacheExt::retrieve_chunk_subset_opt_cached)
/// - [`retrieve_array_subset_opt_cached`](ArrayChunkCacheExt::retrieve_array_subset_opt_cached)
///
/// `_elements` and `_ndarray` variants are also available.
/// Each method has a `cache` parameter that implements the [`ChunkCache`] trait.
/// Cached retrieve methods do not use partial decoders, so any intersected chunk is fully decoded if not present in the cache.
/// This can reduce performance for some access patterns.
/// Prefer not to use a chunk cache if chunks are not accessed repeatedly.
///
/// Two chunk caches are provided by `zarrs`:
/// - [`ChunkCacheLruChunkLimit`]: an LRU (least recently used) cache with a fixed chunk capacity.
/// - [`ChunkCacheLruSizeLimit`]: an LRU cache with a fixed size in bytes.
/// Several Least Recently Used (LRU) chunk caches are provided by `zarrs`:
/// - [`ChunkCacheDecodedLruChunkLimit`]: a decoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheEncodedLruChunkLimit`]: an encoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheDecodedLruSizeLimit`]: a decoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheEncodedLruSizeLimit`]: an encoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheDecodedLruChunkLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheEncodedLruChunkLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheDecodedLruSizeLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed size in bytes (per thread).
/// - [`ChunkCacheEncodedLruSizeLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed size in bytes (per thread).
///
/// These caches use internal locking to support multithreading, which has a performance overhead.
/// `zarrs` consumers can create custom caches by implementing the [`ChunkCache`] trait.
/// For example, consider a custom lock-free per-thread cache, or an alternative to LRU.
///
/// Chunk caching is likely to be effective for remote stores where redundant retrieval are costly.
/// Chunk caching may not outperform disk caching with a filesystem store.
/// The above caches use internal locking to support multithreading, which has a performance overhead.
/// **Prefer not to use a chunk cache if chunks are not accessed repeatedly**, because cached retrieve methods do not use partial decoders and any intersected chunk is fully decoded if not present in the cache.
/// The encoded chunk caches may be optimal if dealing with highly compressed/sparse data with a fast codec.
/// However, the decoded chunk caches are likely to be more performant in most cases.
///
/// For many access patterns, chunk caching may reduce performance.
/// **Benchmark your algorithm/data.**
///
/// ### Reading Sharded Arrays
/// The `sharding_indexed` ([`ShardingCodec`](codec::array_to_bytes::sharding)) codec enables multiple sub-chunks ("inner chunks") to be stored in a single chunk ("shard").
Expand Down
80 changes: 67 additions & 13 deletions src/array/chunk_cache.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,94 @@
use std::sync::Arc;

use super::{ArrayBytes, ArrayError};
use crate::storage::ReadableStorageTraits;

pub mod array_chunk_cache_sync_readable_ext;
pub mod chunk_cache_lru_chunk_limit;
pub mod chunk_cache_lru_size_limit;
use super::{codec::CodecOptions, Array, ArrayBytes, ArrayError, RawBytes};

// pub mod array_chunk_cache_ext_decoded_sync;
// pub mod array_chunk_cache_ext_encoded_sync;
pub mod array_chunk_cache_ext_sync;
pub mod chunk_cache_lru;
// pub mod chunk_cache_lru_chunk_limit_thread_local;
// pub mod chunk_cache_lru_size_limit_thread_local;

/// The chunk type of an encoded chunk cache.
pub type ChunkCacheTypeEncoded = Option<RawBytes<'static>>;

/// The chunk type of a decoded chunk cache.
pub type ChunkCacheTypeDecoded = ArrayBytes<'static>;

/// A chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]).
pub trait ChunkCacheType: Send + Sync + 'static {
/// The size of the chunk in bytes.
fn size(&self) -> usize;
}

impl ChunkCacheType for ChunkCacheTypeEncoded {
fn size(&self) -> usize {
self.as_ref().map_or(0, |v| v.len())
}
}

impl ChunkCacheType for ChunkCacheTypeDecoded {
fn size(&self) -> usize {
ArrayBytes::size(self)
}
}

/// Traits for a chunk cache.
pub trait ChunkCache: Send + Sync {
pub trait ChunkCache<CT: ChunkCacheType>: Send + Sync {
/// Retrieve and decode a chunk.
///
/// # Errors
/// Returns an [`ArrayError`] if the underlying array retrieval method fails.
fn retrieve_chunk<TStorage: ?Sized + ReadableStorageTraits + 'static>(
&self,
array: &Array<TStorage>,
chunk_indices: &[u64],
options: &CodecOptions,
) -> Result<Arc<ArrayBytes<'static>>, ArrayError>;

/// Retrieve a chunk from the cache. Returns [`None`] if the chunk is not present.
///
/// The chunk cache implementation may modify the cache (e.g. update LRU cache) on retrieval.
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<ArrayBytes<'static>>>;
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<CT>>;

/// Insert a chunk into the cache.
fn insert(&self, chunk_indices: Vec<u64>, chunk: Arc<ArrayBytes<'static>>);
fn insert(&self, chunk_indices: Vec<u64>, chunk: Arc<CT>);

/// Get or insert a chunk in the cache.
///
/// Override the default implementation if a chunk offers a more performant implementation.
///
/// # Errors
/// Returns an error if `f` returns an error.
fn try_get_or_insert_with<F, E>(
&self,
key: Vec<u64>,
chunk_indices: Vec<u64>,
f: F,
) -> Result<Arc<ArrayBytes<'static>>, Arc<ArrayError>>
) -> Result<Arc<CT>, Arc<ArrayError>>
where
F: FnOnce() -> Result<Arc<ArrayBytes<'static>>, ArrayError>;
F: FnOnce() -> Result<Arc<CT>, ArrayError>,
{
let chunk_indices = chunk_indices.clone();
if let Some(chunk) = self.get(&chunk_indices) {
Ok(chunk)
} else {
let chunk = f()?;
self.insert(chunk_indices, chunk.clone());
Ok(chunk)
}
}

/// Return the number of chunks in the cache.
/// Return the number of chunks in the cache. For a thread-local cache, returns the number of chunks cached on the current thread.
#[must_use]
fn len(&self) -> usize;

/// Returns true if the cache is empty.
/// Returns true if the cache is empty. For a thread-local cache, returns if the cache is empty on the current thread.
#[must_use]
fn is_empty(&self) -> bool;
fn is_empty(&self) -> bool {
self.len() == 0
}
}

// TODO: AsyncChunkCache
Loading