Skip to content

Commit

Permalink
Add thread local chunk caches
Browse files Browse the repository at this point in the history
  • Loading branch information
LDeakin committed Aug 26, 2024
1 parent 385d759 commit 14203b2
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ gdeflate-sys = { version = "0.4.1", optional = true }
half = { version = "2.0.0", features = ["bytemuck"] }
inventory = "0.3.0"
itertools = "0.13.0"
lru = "0.12.4"
moka = { version = "0.12.8", features = ["sync"] }
monostate = "0.1.0"
ndarray = { version = ">=0.15.0,<17", optional = true }
Expand All @@ -72,6 +73,7 @@ serde = { version = "1.0.184", features = ["derive"] }
serde_json = { version = "1.0.71", features = ["float_roundtrip", "preserve_order"] }
serde_repr = "0.1.19"
thiserror = "1.0.61"
thread_local = "1.1.8"
url = { version = "2.2.0", optional = true }
walkdir = "2.3.2"
zfp-sys = {version = "0.1.15", features = ["static"], optional = true }
Expand Down
16 changes: 11 additions & 5 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,15 @@ pub use chunk_cache::{
chunk_cache_lru_chunk_limit::{
ChunkCacheDecodedLruChunkLimit, ChunkCacheEncodedLruChunkLimit, ChunkCacheLruChunkLimit,
},
chunk_cache_lru_chunk_limit_thread_local::{
ChunkCacheDecodedLruChunkLimitThreadLocal, ChunkCacheEncodedLruChunkLimitThreadLocal,
},
chunk_cache_lru_size_limit::{
ChunkCacheDecodedLruSizeLimit, ChunkCacheEncodedLruSizeLimit, ChunkCacheLruSizeLimit,
},
chunk_cache_lru_size_limit_thread_local::{
ChunkCacheDecodedLruSizeLimitThreadLocal, ChunkCacheEncodedLruSizeLimitThreadLocal,
},
ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded,
};

Expand Down Expand Up @@ -224,11 +230,15 @@ pub struct NonZeroError;
/// `_elements` and `_ndarray` variants are also available.
/// Each method has a `cache` parameter that implements the [`ChunkCache`] trait.
///
/// Four Least Recently Used (LRU) chunk caches are provided by `zarrs`:
/// Several Least Recently Used (LRU) chunk caches are provided by `zarrs`:
/// - [`ChunkCacheDecodedLruChunkLimit`]: a decoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheEncodedLruChunkLimit`]: an encoded chunk cache with a fixed chunk capacity.
/// - [`ChunkCacheDecodedLruSizeLimit`]: a decoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheEncodedLruSizeLimit`]: an encoded chunk cache with a fixed size in bytes.
/// - [`ChunkCacheDecodedLruChunkLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheEncodedLruChunkLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed chunk capacity (per thread).
/// - [`ChunkCacheDecodedLruSizeLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed size in bytes (per thread).
/// - [`ChunkCacheEncodedLruSizeLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed size in bytes (per thread).
///
/// `zarrs` consumers can create custom caches by implementing the [`ChunkCache`] trait.
///
Expand All @@ -242,10 +252,6 @@ pub struct NonZeroError;
/// For many access patterns, chunk caching may reduce performance.
/// **Benchmark your algorithm/data.**
///
/// *`zarrs` does not provide any lock-free caches.
/// However, it may be feasible to use a custom lock-free per-thread cache if only using `retrieve_chunk_opt_cached`, `retrieve_chunk_subset_opt_cached`, and variants sequentially per thread.
/// The other [`ArrayChunkCacheExt`] methods cannot be used with a lock-free cache since they operate on multiple chunks in parallel.*
///
/// ### Reading Sharded Arrays
/// The `sharding_indexed` ([`ShardingCodec`](codec::array_to_bytes::sharding)) codec enables multiple sub-chunks ("inner chunks") to be stored in a single chunk ("shard").
/// With a sharded array, the [`chunk_grid`](Array::chunk_grid) and chunk indices in store/retrieve methods reference the chunks ("shards") of an array.
Expand Down
21 changes: 17 additions & 4 deletions src/array/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,33 @@ pub mod array_chunk_cache_ext_decoded_sync;
pub mod array_chunk_cache_ext_encoded_sync;
pub mod array_chunk_cache_ext_sync;
pub mod chunk_cache_lru_chunk_limit;
pub mod chunk_cache_lru_chunk_limit_thread_local;
pub mod chunk_cache_lru_size_limit;
pub mod chunk_cache_lru_size_limit_thread_local;

/// The chunk type of an encoded chunk cache.
pub type ChunkCacheTypeEncoded = Option<RawBytes<'static>>;

/// The chunk type of a decoded chunk cache.
pub type ChunkCacheTypeDecoded = ArrayBytes<'static>;

/// A marker trait for a chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]).
pub trait ChunkCacheType: Send + Sync + 'static {}
/// A chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]).
pub trait ChunkCacheType: Send + Sync + 'static {
/// The size of the chunk in bytes.
fn size(&self) -> usize;
}

impl ChunkCacheType for ChunkCacheTypeEncoded {}
impl ChunkCacheType for ChunkCacheTypeEncoded {
fn size(&self) -> usize {
self.as_ref().map_or(0, |v| v.len())
}

Check warning on line 28 in src/array/chunk_cache.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache.rs#L26-L28

Added lines #L26 - L28 were not covered by tests
}

impl ChunkCacheType for ChunkCacheTypeDecoded {}
impl ChunkCacheType for ChunkCacheTypeDecoded {
fn size(&self) -> usize {
ArrayBytes::size(self)
}

Check warning on line 34 in src/array/chunk_cache.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache.rs#L32-L34

Added lines #L32 - L34 were not covered by tests
}

/// Traits for an encoded chunk cache.
pub trait ChunkCache<CT: ChunkCacheType>: Send + Sync {
Expand Down
4 changes: 2 additions & 2 deletions src/array/chunk_cache/chunk_cache_lru_chunk_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ pub struct ChunkCacheLruChunkLimit<T: ChunkCacheType> {
cache: Cache<ChunkIndices, Arc<T>>,
}

/// An LRU (least recently used) encoded chunk cache with a fixed size in bytes.
/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheEncodedLruChunkLimit = ChunkCacheLruChunkLimit<ChunkCacheTypeEncoded>;

/// An LRU (least recently used) decoded chunk cache with a fixed size in bytes.
/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheDecodedLruChunkLimit = ChunkCacheLruChunkLimit<ChunkCacheTypeDecoded>;

impl<T: ChunkCacheType> ChunkCacheLruChunkLimit<T> {
Expand Down
85 changes: 85 additions & 0 deletions src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
use std::{
num::NonZeroUsize,
sync::{Arc, Mutex},
};

use lru::LruCache;
use thread_local::ThreadLocal;

use crate::array::{ArrayError, ArrayIndices};

use super::{ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded};
type ChunkIndices = ArrayIndices;

/// A thread local chunk cache with a fixed chunk capacity per thread.
pub struct ChunkCacheLruChunkLimitThreadLocal<T: ChunkCacheType> {
cache: ThreadLocal<Mutex<LruCache<ChunkIndices, Arc<T>>>>,
capacity: u64,
}

/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheEncodedLruChunkLimitThreadLocal =
ChunkCacheLruChunkLimitThreadLocal<ChunkCacheTypeEncoded>;

/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheDecodedLruChunkLimitThreadLocal =
ChunkCacheLruChunkLimitThreadLocal<ChunkCacheTypeDecoded>;

impl<CT: ChunkCacheType> ChunkCacheLruChunkLimitThreadLocal<CT> {
/// Create a new [`ChunkCacheLruChunkLimitThreadLocal`] with a capacity in bytes of `capacity`.
#[must_use]
pub fn new(capacity: u64) -> Self {
let cache = ThreadLocal::new();
Self { cache, capacity }
}

Check warning on line 34 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L31-L34

Added lines #L31 - L34 were not covered by tests

fn cache(&self) -> &Mutex<LruCache<ChunkIndices, Arc<CT>>> {
self.cache.get_or(|| {
Mutex::new(LruCache::new(
NonZeroUsize::new(usize::try_from(self.capacity).unwrap_or(usize::MAX).max(1))
.unwrap(),
))
})
}

Check warning on line 43 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L36-L43

Added lines #L36 - L43 were not covered by tests
}

macro_rules! impl_ChunkCacheLruChunkLimitThreadLocal {
($t:ty) => {
impl<CT: ChunkCacheType> ChunkCache<CT> for $t {
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<CT>> {
self.cache()
.lock()
.unwrap()
.get(&chunk_indices.to_vec())
.cloned()
}

Check warning on line 55 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L49-L55

Added lines #L49 - L55 were not covered by tests

fn insert(&self, chunk_indices: ChunkIndices, chunk: Arc<CT>) {
self.cache().lock().unwrap().push(chunk_indices, chunk);
}

Check warning on line 59 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L57-L59

Added lines #L57 - L59 were not covered by tests

fn try_get_or_insert_with<F, E>(
&self,
chunk_indices: Vec<u64>,
f: F,
) -> Result<Arc<CT>, Arc<ArrayError>>
where
F: FnOnce() -> Result<Arc<CT>, ArrayError>,
{
self.cache()
.lock()
.unwrap()
.try_get_or_insert(chunk_indices, f)
.cloned()
.map_err(|e| Arc::new(e))
}

Check warning on line 75 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L61-L75

Added lines #L61 - L75 were not covered by tests

fn len(&self) -> usize {
self.cache().lock().unwrap().len()
}

Check warning on line 79 in src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs#L77-L79

Added lines #L77 - L79 were not covered by tests
}
};
}

impl_ChunkCacheLruChunkLimitThreadLocal!(ChunkCacheLruChunkLimitThreadLocal<CT>);
impl_ChunkCacheLruChunkLimitThreadLocal!(&ChunkCacheLruChunkLimitThreadLocal<CT>);
4 changes: 2 additions & 2 deletions src/array/chunk_cache/chunk_cache_lru_size_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ pub struct ChunkCacheLruSizeLimit<T: ChunkCacheType> {
cache: Cache<ChunkIndices, Arc<T>>,
}

/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity.
/// An LRU (least recently used) encoded chunk cache with a fixed size capacity in bytes.
pub type ChunkCacheEncodedLruSizeLimit = ChunkCacheLruSizeLimit<ChunkCacheTypeEncoded>;

/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity.
/// An LRU (least recently used) decoded chunk cache with a fixed size capacity in bytes.
pub type ChunkCacheDecodedLruSizeLimit = ChunkCacheLruSizeLimit<ChunkCacheTypeDecoded>;

impl ChunkCacheLruSizeLimit<ChunkCacheTypeDecoded> {
Expand Down
89 changes: 89 additions & 0 deletions src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::sync::{
atomic::{self, AtomicUsize},
Arc, Mutex,
};

use lru::LruCache;
use thread_local::ThreadLocal;

use crate::array::ArrayIndices;

use super::{ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded};
type ChunkIndices = ArrayIndices;

/// A thread local chunk cache with a fixed chunk capacity per thread.
pub struct ChunkCacheLruSizeLimitThreadLocal<T: ChunkCacheType> {
cache: ThreadLocal<Mutex<LruCache<ChunkIndices, Arc<T>>>>,
capacity: usize,
size: ThreadLocal<AtomicUsize>,
}

/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheEncodedLruSizeLimitThreadLocal =
ChunkCacheLruSizeLimitThreadLocal<ChunkCacheTypeEncoded>;

/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity.
pub type ChunkCacheDecodedLruSizeLimitThreadLocal =
ChunkCacheLruSizeLimitThreadLocal<ChunkCacheTypeDecoded>;

impl<CT: ChunkCacheType> ChunkCacheLruSizeLimitThreadLocal<CT> {
/// Create a new [`ChunkCacheLruSizeLimitThreadLocal`] with a capacity in bytes of `capacity`.
#[must_use]
pub fn new(capacity: u64) -> Self {
let cache = ThreadLocal::new();
Self {
cache,
capacity: usize::try_from(capacity).unwrap_or(usize::MAX),
size: ThreadLocal::new(),
}
}

Check warning on line 39 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L32-L39

Added lines #L32 - L39 were not covered by tests

fn cache(&self) -> &Mutex<LruCache<ChunkIndices, Arc<CT>>> {
self.cache.get_or(|| Mutex::new(LruCache::unbounded()))
}

Check warning on line 43 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L41-L43

Added lines #L41 - L43 were not covered by tests
}

impl<T: ChunkCacheType> ChunkCacheLruSizeLimitThreadLocal<T> {
/// Return the size of the cache in bytes.
#[must_use]
pub fn size(&self) -> usize {
self.size.get_or_default().load(atomic::Ordering::SeqCst)
}

Check warning on line 51 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L49-L51

Added lines #L49 - L51 were not covered by tests
}

macro_rules! impl_ChunkCacheLruSizeLimitThreadLocal {
($t:ty) => {
impl<CT: ChunkCacheType> ChunkCache<CT> for $t {
fn get(&self, chunk_indices: &[u64]) -> Option<Arc<CT>> {
self.cache()
.lock()
.unwrap()
.get(&chunk_indices.to_vec())
.cloned()
}

Check warning on line 63 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L57-L63

Added lines #L57 - L63 were not covered by tests

fn insert(&self, chunk_indices: ChunkIndices, chunk: Arc<CT>) {
let size = self.size.get_or_default();
let size_old = size.fetch_add(chunk.size(), atomic::Ordering::SeqCst);
if size_old + chunk.size() > self.capacity {
let old = self.cache().lock().unwrap().pop_lru();
if let Some(old) = old {
size.fetch_sub(old.1.size(), atomic::Ordering::SeqCst);
}
}

Check warning on line 73 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L65-L73

Added lines #L65 - L73 were not covered by tests

let old = self.cache().lock().unwrap().push(chunk_indices, chunk);
if let Some(old) = old {
size.fetch_sub(old.1.size(), atomic::Ordering::SeqCst);
}
}

Check warning on line 79 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L75-L79

Added lines #L75 - L79 were not covered by tests

fn len(&self) -> usize {
self.cache().lock().unwrap().len()
}

Check warning on line 83 in src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs

View check run for this annotation

Codecov / codecov/patch

src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs#L81-L83

Added lines #L81 - L83 were not covered by tests
}
};
}

impl_ChunkCacheLruSizeLimitThreadLocal!(ChunkCacheLruSizeLimitThreadLocal<CT>);
impl_ChunkCacheLruSizeLimitThreadLocal!(&ChunkCacheLruSizeLimitThreadLocal<CT>);

0 comments on commit 14203b2

Please sign in to comment.