From 14203b290525739b22ee33ef4617a2e5bfb075e9 Mon Sep 17 00:00:00 2001 From: Lachlan Deakin Date: Mon, 26 Aug 2024 22:19:34 +1000 Subject: [PATCH] Add thread local chunk caches --- Cargo.toml | 2 + src/array.rs | 16 ++-- src/array/chunk_cache.rs | 21 ++++- .../chunk_cache_lru_chunk_limit.rs | 4 +- ...hunk_cache_lru_chunk_limit_thread_local.rs | 85 ++++++++++++++++++ .../chunk_cache/chunk_cache_lru_size_limit.rs | 4 +- ...chunk_cache_lru_size_limit_thread_local.rs | 89 +++++++++++++++++++ 7 files changed, 208 insertions(+), 13 deletions(-) create mode 100644 src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs create mode 100644 src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs diff --git a/Cargo.toml b/Cargo.toml index 5f6780fd..35007aab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ gdeflate-sys = { version = "0.4.1", optional = true } half = { version = "2.0.0", features = ["bytemuck"] } inventory = "0.3.0" itertools = "0.13.0" +lru = "0.12.4" moka = { version = "0.12.8", features = ["sync"] } monostate = "0.1.0" ndarray = { version = ">=0.15.0,<17", optional = true } @@ -72,6 +73,7 @@ serde = { version = "1.0.184", features = ["derive"] } serde_json = { version = "1.0.71", features = ["float_roundtrip", "preserve_order"] } serde_repr = "0.1.19" thiserror = "1.0.61" +thread_local = "1.1.8" url = { version = "2.2.0", optional = true } walkdir = "2.3.2" zfp-sys = {version = "0.1.15", features = ["static"], optional = true } diff --git a/src/array.rs b/src/array.rs index 8e8e4227..06c7ff46 100644 --- a/src/array.rs +++ b/src/array.rs @@ -78,9 +78,15 @@ pub use chunk_cache::{ chunk_cache_lru_chunk_limit::{ ChunkCacheDecodedLruChunkLimit, ChunkCacheEncodedLruChunkLimit, ChunkCacheLruChunkLimit, }, + chunk_cache_lru_chunk_limit_thread_local::{ + ChunkCacheDecodedLruChunkLimitThreadLocal, ChunkCacheEncodedLruChunkLimitThreadLocal, + }, chunk_cache_lru_size_limit::{ ChunkCacheDecodedLruSizeLimit, ChunkCacheEncodedLruSizeLimit, ChunkCacheLruSizeLimit, }, + chunk_cache_lru_size_limit_thread_local::{ + ChunkCacheDecodedLruSizeLimitThreadLocal, ChunkCacheEncodedLruSizeLimitThreadLocal, + }, ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded, }; @@ -224,11 +230,15 @@ pub struct NonZeroError; /// `_elements` and `_ndarray` variants are also available. /// Each method has a `cache` parameter that implements the [`ChunkCache`] trait. /// -/// Four Least Recently Used (LRU) chunk caches are provided by `zarrs`: +/// Several Least Recently Used (LRU) chunk caches are provided by `zarrs`: /// - [`ChunkCacheDecodedLruChunkLimit`]: a decoded chunk cache with a fixed chunk capacity. /// - [`ChunkCacheEncodedLruChunkLimit`]: an encoded chunk cache with a fixed chunk capacity. /// - [`ChunkCacheDecodedLruSizeLimit`]: a decoded chunk cache with a fixed size in bytes. /// - [`ChunkCacheEncodedLruSizeLimit`]: an encoded chunk cache with a fixed size in bytes. +/// - [`ChunkCacheDecodedLruChunkLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed chunk capacity (per thread). +/// - [`ChunkCacheEncodedLruChunkLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed chunk capacity (per thread). +/// - [`ChunkCacheDecodedLruSizeLimitThreadLocal`]: a thread-local decoded chunk cache with a fixed size in bytes (per thread). +/// - [`ChunkCacheEncodedLruSizeLimitThreadLocal`]: a thread-local encoded chunk cache with a fixed size in bytes (per thread). /// /// `zarrs` consumers can create custom caches by implementing the [`ChunkCache`] trait. /// @@ -242,10 +252,6 @@ pub struct NonZeroError; /// For many access patterns, chunk caching may reduce performance. /// **Benchmark your algorithm/data.** /// -/// *`zarrs` does not provide any lock-free caches. -/// However, it may be feasible to use a custom lock-free per-thread cache if only using `retrieve_chunk_opt_cached`, `retrieve_chunk_subset_opt_cached`, and variants sequentially per thread. -/// The other [`ArrayChunkCacheExt`] methods cannot be used with a lock-free cache since they operate on multiple chunks in parallel.* -/// /// ### Reading Sharded Arrays /// The `sharding_indexed` ([`ShardingCodec`](codec::array_to_bytes::sharding)) codec enables multiple sub-chunks ("inner chunks") to be stored in a single chunk ("shard"). /// With a sharded array, the [`chunk_grid`](Array::chunk_grid) and chunk indices in store/retrieve methods reference the chunks ("shards") of an array. diff --git a/src/array/chunk_cache.rs b/src/array/chunk_cache.rs index 4ea10c42..44f35a4a 100644 --- a/src/array/chunk_cache.rs +++ b/src/array/chunk_cache.rs @@ -6,7 +6,9 @@ pub mod array_chunk_cache_ext_decoded_sync; pub mod array_chunk_cache_ext_encoded_sync; pub mod array_chunk_cache_ext_sync; pub mod chunk_cache_lru_chunk_limit; +pub mod chunk_cache_lru_chunk_limit_thread_local; pub mod chunk_cache_lru_size_limit; +pub mod chunk_cache_lru_size_limit_thread_local; /// The chunk type of an encoded chunk cache. pub type ChunkCacheTypeEncoded = Option>; @@ -14,12 +16,23 @@ pub type ChunkCacheTypeEncoded = Option>; /// The chunk type of a decoded chunk cache. pub type ChunkCacheTypeDecoded = ArrayBytes<'static>; -/// A marker trait for a chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]). -pub trait ChunkCacheType: Send + Sync + 'static {} +/// A chunk type ([`ChunkCacheTypeEncoded`] or [`ChunkCacheTypeDecoded`]). +pub trait ChunkCacheType: Send + Sync + 'static { + /// The size of the chunk in bytes. + fn size(&self) -> usize; +} -impl ChunkCacheType for ChunkCacheTypeEncoded {} +impl ChunkCacheType for ChunkCacheTypeEncoded { + fn size(&self) -> usize { + self.as_ref().map_or(0, |v| v.len()) + } +} -impl ChunkCacheType for ChunkCacheTypeDecoded {} +impl ChunkCacheType for ChunkCacheTypeDecoded { + fn size(&self) -> usize { + ArrayBytes::size(self) + } +} /// Traits for an encoded chunk cache. pub trait ChunkCache: Send + Sync { diff --git a/src/array/chunk_cache/chunk_cache_lru_chunk_limit.rs b/src/array/chunk_cache/chunk_cache_lru_chunk_limit.rs index b91f1e8d..164a93b7 100644 --- a/src/array/chunk_cache/chunk_cache_lru_chunk_limit.rs +++ b/src/array/chunk_cache/chunk_cache_lru_chunk_limit.rs @@ -16,10 +16,10 @@ pub struct ChunkCacheLruChunkLimit { cache: Cache>, } -/// An LRU (least recently used) encoded chunk cache with a fixed size in bytes. +/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity. pub type ChunkCacheEncodedLruChunkLimit = ChunkCacheLruChunkLimit; -/// An LRU (least recently used) decoded chunk cache with a fixed size in bytes. +/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity. pub type ChunkCacheDecodedLruChunkLimit = ChunkCacheLruChunkLimit; impl ChunkCacheLruChunkLimit { diff --git a/src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs b/src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs new file mode 100644 index 00000000..5bf151aa --- /dev/null +++ b/src/array/chunk_cache/chunk_cache_lru_chunk_limit_thread_local.rs @@ -0,0 +1,85 @@ +use std::{ + num::NonZeroUsize, + sync::{Arc, Mutex}, +}; + +use lru::LruCache; +use thread_local::ThreadLocal; + +use crate::array::{ArrayError, ArrayIndices}; + +use super::{ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded}; +type ChunkIndices = ArrayIndices; + +/// A thread local chunk cache with a fixed chunk capacity per thread. +pub struct ChunkCacheLruChunkLimitThreadLocal { + cache: ThreadLocal>>>, + capacity: u64, +} + +/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity. +pub type ChunkCacheEncodedLruChunkLimitThreadLocal = + ChunkCacheLruChunkLimitThreadLocal; + +/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity. +pub type ChunkCacheDecodedLruChunkLimitThreadLocal = + ChunkCacheLruChunkLimitThreadLocal; + +impl ChunkCacheLruChunkLimitThreadLocal { + /// Create a new [`ChunkCacheLruChunkLimitThreadLocal`] with a capacity in bytes of `capacity`. + #[must_use] + pub fn new(capacity: u64) -> Self { + let cache = ThreadLocal::new(); + Self { cache, capacity } + } + + fn cache(&self) -> &Mutex>> { + self.cache.get_or(|| { + Mutex::new(LruCache::new( + NonZeroUsize::new(usize::try_from(self.capacity).unwrap_or(usize::MAX).max(1)) + .unwrap(), + )) + }) + } +} + +macro_rules! impl_ChunkCacheLruChunkLimitThreadLocal { + ($t:ty) => { + impl ChunkCache for $t { + fn get(&self, chunk_indices: &[u64]) -> Option> { + self.cache() + .lock() + .unwrap() + .get(&chunk_indices.to_vec()) + .cloned() + } + + fn insert(&self, chunk_indices: ChunkIndices, chunk: Arc) { + self.cache().lock().unwrap().push(chunk_indices, chunk); + } + + fn try_get_or_insert_with( + &self, + chunk_indices: Vec, + f: F, + ) -> Result, Arc> + where + F: FnOnce() -> Result, ArrayError>, + { + self.cache() + .lock() + .unwrap() + .try_get_or_insert(chunk_indices, f) + .cloned() + .map_err(|e| Arc::new(e)) + } + + fn len(&self) -> usize { + self.cache().lock().unwrap().len() + } + } + }; +} + +impl_ChunkCacheLruChunkLimitThreadLocal!(ChunkCacheLruChunkLimitThreadLocal); +impl_ChunkCacheLruChunkLimitThreadLocal!(&ChunkCacheLruChunkLimitThreadLocal); diff --git a/src/array/chunk_cache/chunk_cache_lru_size_limit.rs b/src/array/chunk_cache/chunk_cache_lru_size_limit.rs index 14cc47df..1f3f3b7e 100644 --- a/src/array/chunk_cache/chunk_cache_lru_size_limit.rs +++ b/src/array/chunk_cache/chunk_cache_lru_size_limit.rs @@ -15,10 +15,10 @@ pub struct ChunkCacheLruSizeLimit { cache: Cache>, } -/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity. +/// An LRU (least recently used) encoded chunk cache with a fixed size capacity in bytes. pub type ChunkCacheEncodedLruSizeLimit = ChunkCacheLruSizeLimit; -/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity. +/// An LRU (least recently used) decoded chunk cache with a fixed size capacity in bytes. pub type ChunkCacheDecodedLruSizeLimit = ChunkCacheLruSizeLimit; impl ChunkCacheLruSizeLimit { diff --git a/src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs b/src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs new file mode 100644 index 00000000..c1ec54aa --- /dev/null +++ b/src/array/chunk_cache/chunk_cache_lru_size_limit_thread_local.rs @@ -0,0 +1,89 @@ +use std::sync::{ + atomic::{self, AtomicUsize}, + Arc, Mutex, +}; + +use lru::LruCache; +use thread_local::ThreadLocal; + +use crate::array::ArrayIndices; + +use super::{ChunkCache, ChunkCacheType, ChunkCacheTypeDecoded, ChunkCacheTypeEncoded}; +type ChunkIndices = ArrayIndices; + +/// A thread local chunk cache with a fixed chunk capacity per thread. +pub struct ChunkCacheLruSizeLimitThreadLocal { + cache: ThreadLocal>>>, + capacity: usize, + size: ThreadLocal, +} + +/// An LRU (least recently used) encoded chunk cache with a fixed chunk capacity. +pub type ChunkCacheEncodedLruSizeLimitThreadLocal = + ChunkCacheLruSizeLimitThreadLocal; + +/// An LRU (least recently used) decoded chunk cache with a fixed chunk capacity. +pub type ChunkCacheDecodedLruSizeLimitThreadLocal = + ChunkCacheLruSizeLimitThreadLocal; + +impl ChunkCacheLruSizeLimitThreadLocal { + /// Create a new [`ChunkCacheLruSizeLimitThreadLocal`] with a capacity in bytes of `capacity`. + #[must_use] + pub fn new(capacity: u64) -> Self { + let cache = ThreadLocal::new(); + Self { + cache, + capacity: usize::try_from(capacity).unwrap_or(usize::MAX), + size: ThreadLocal::new(), + } + } + + fn cache(&self) -> &Mutex>> { + self.cache.get_or(|| Mutex::new(LruCache::unbounded())) + } +} + +impl ChunkCacheLruSizeLimitThreadLocal { + /// Return the size of the cache in bytes. + #[must_use] + pub fn size(&self) -> usize { + self.size.get_or_default().load(atomic::Ordering::SeqCst) + } +} + +macro_rules! impl_ChunkCacheLruSizeLimitThreadLocal { + ($t:ty) => { + impl ChunkCache for $t { + fn get(&self, chunk_indices: &[u64]) -> Option> { + self.cache() + .lock() + .unwrap() + .get(&chunk_indices.to_vec()) + .cloned() + } + + fn insert(&self, chunk_indices: ChunkIndices, chunk: Arc) { + let size = self.size.get_or_default(); + let size_old = size.fetch_add(chunk.size(), atomic::Ordering::SeqCst); + if size_old + chunk.size() > self.capacity { + let old = self.cache().lock().unwrap().pop_lru(); + if let Some(old) = old { + size.fetch_sub(old.1.size(), atomic::Ordering::SeqCst); + } + } + + let old = self.cache().lock().unwrap().push(chunk_indices, chunk); + if let Some(old) = old { + size.fetch_sub(old.1.size(), atomic::Ordering::SeqCst); + } + } + + fn len(&self) -> usize { + self.cache().lock().unwrap().len() + } + } + }; +} + +impl_ChunkCacheLruSizeLimitThreadLocal!(ChunkCacheLruSizeLimitThreadLocal); +impl_ChunkCacheLruSizeLimitThreadLocal!(&ChunkCacheLruSizeLimitThreadLocal);