Skip to content

Commit

Permalink
Merge pull request #48 from LDeakin/set_partial_values
Browse files Browse the repository at this point in the history
`set_partial_values` improvements
  • Loading branch information
LDeakin authored Aug 14, 2024
2 parents 0dfbe6a + 423fa26 commit 5473c8a
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- **Breaking**: `Arc` instead of `Box` partial decoders
- Expand `set_partial_values` tests
- Specialise `set_partial_values` for `MemoryStore`

### Fixed
- `[async_]store_set_partial_values` no longer truncates
- this could corrupt values depending on the order of `set_partial_values` calls

## [0.16.3] - 2024-08-14

Expand Down
11 changes: 8 additions & 3 deletions src/storage/storage_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ pub trait AsyncListableStorageTraits: Send + Sync {

/// Set partial values for an asynchronous store.
///
/// This method reads entire values, updates them, and replaces them.
/// Stores can use this internally if they do not support updating/appending without replacement.
///
/// # Errors
/// Returns a [`StorageError`] if an underlying store operation fails.
///
Expand All @@ -179,6 +182,7 @@ pub trait AsyncListableStorageTraits: Send + Sync {
pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTraits>(
store: &T,
key_start_values: &[StoreKeyStartValue<'_>],
// truncate: bool
) -> Result<(), StorageError> {
let groups = key_start_values
.iter()
Expand All @@ -202,9 +206,10 @@ pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTrait
usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()).unwrap();
if vec.len() < end_max {
vec.resize_with(end_max, Default::default);
} else {
vec.truncate(end_max);
};
}
// else if truncate {
// vec.truncate(end_max);
// };

// Update the store key
for key_start_value in group {
Expand Down
12 changes: 9 additions & 3 deletions src/storage/storage_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ pub trait ListableStorageTraits: Send + Sync {

/// Set partial values for a store.
///
/// This method reads entire values, updates them, and replaces them.
/// Stores can use this internally if they do not support updating/appending without replacement.
///
/// # Errors
/// Returns a [`StorageError`] if an underlying store operation fails.
///
Expand All @@ -160,6 +163,7 @@ pub trait ListableStorageTraits: Send + Sync {
pub fn store_set_partial_values<T: ReadableWritableStorageTraits>(
store: &T,
key_start_values: &[StoreKeyStartValue],
// truncate: bool,
) -> Result<(), StorageError> {
// Group by key
key_start_values
Expand All @@ -184,10 +188,12 @@ pub fn store_set_partial_values<T: ReadableWritableStorageTraits>(
vec.extend_from_slice(&bytes);
vec.resize_with(end_max, Default::default);
vec
// } else if truncate {
// let mut bytes = bytes.to_vec();
// bytes.truncate(end_max);
// bytes
} else {
let mut bytes = bytes.to_vec();
bytes.truncate(end_max);
bytes
bytes.to_vec()
};

// Update the store key
Expand Down
24 changes: 16 additions & 8 deletions src/storage/store/store_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod test_util {

/// Create a store with the following data
/// - a/
/// - b [0, 1, 2]
/// - b [0, 1, 2, 3]
/// - c [0]
/// - d/
/// - e
Expand All @@ -33,10 +33,18 @@ mod test_util {
) -> Result<(), Box<dyn Error>> {
store.erase_prefix(&StorePrefix::root()).await?;

store.set(&"a/b".try_into()?, vec![0, 0, 0].into()).await?;
store
.set(&"a/b".try_into()?, vec![255, 255, 255].into())
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 1, &[1, 2])])
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 3, &[3])])
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 0, &[0])])
.await?;

store.set(&"a/c".try_into()?, vec![0].into()).await?;
store.set(&"a/d/e".try_into()?, vec![].into()).await?;
Expand Down Expand Up @@ -76,9 +84,9 @@ mod test_util {
assert!(store.size_key(&"notfound".try_into()?).await?.is_none());
assert_eq!(
store.get(&"a/b".try_into()?).await?,
Some(vec![0, 1, 2].into())
Some(vec![0, 1, 2, 3].into())
);
assert_eq!(store.size_key(&"a/b".try_into()?).await?, Some(3));
assert_eq!(store.size_key(&"a/b".try_into()?).await?, Some(4));
assert_eq!(store.size_key(&"a/c".try_into()?).await?, Some(1));
assert_eq!(store.size_key(&"i/j/k".try_into()?).await?, Some(2));
assert_eq!(
Expand All @@ -91,7 +99,7 @@ mod test_util {
]
)
.await?,
Some(vec![vec![1].into(), vec![2].into()])
Some(vec![vec![1].into(), vec![3].into()])
);
assert_eq!(
store
Expand All @@ -102,8 +110,8 @@ mod test_util {
])
.await?,
vec![
Some(vec![1, 2, 3].into()),
Some(vec![1, 2].into()),
Some(vec![0, 1].into()),
Some(vec![1].into())
]
);
Expand All @@ -115,8 +123,8 @@ mod test_util {
.await
.is_err());

assert_eq!(store.size().await?, 6);
assert_eq!(store.size_prefix(&"a/".try_into()?).await?, 4);
assert_eq!(store.size().await?, 7);
assert_eq!(store.size_prefix(&"a/".try_into()?).await?, 5);
assert_eq!(store.size_prefix(&"i/".try_into()?).await?, 2);

Ok(())
Expand Down
21 changes: 13 additions & 8 deletions src/storage/store/store_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod test_util {

/// Create a store with the following data
/// - a/
/// - b [0, 1, 2]
/// - b [0, 1, 2, 3]
/// - c [0]
/// - d/
/// - e
Expand All @@ -34,8 +34,10 @@ mod test_util {
pub fn store_write<T: WritableStorageTraits>(store: &T) -> Result<(), Box<dyn Error>> {
store.erase_prefix(&StorePrefix::root())?;

store.set(&"a/b".try_into()?, vec![0, 0, 0].into())?;
store.set(&"a/b".try_into()?, vec![255, 255, 255].into())?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 1, &[1, 2])])?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 3, &[3])])?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 0, &[0])])?;

store.set(&"a/c".try_into()?, vec![0].into())?;
store.set(&"a/d/e".try_into()?, vec![].into())?;
Expand Down Expand Up @@ -63,8 +65,11 @@ mod test_util {
) -> Result<(), Box<dyn Error>> {
assert!(store.get(&"notfound".try_into()?)?.is_none());
assert!(store.size_key(&"notfound".try_into()?)?.is_none());
assert_eq!(store.get(&"a/b".try_into()?)?, Some(vec![0, 1, 2].into()));
assert_eq!(store.size_key(&"a/b".try_into()?)?, Some(3));
assert_eq!(
store.get(&"a/b".try_into()?)?,
Some(vec![0, 1, 2, 3].into())
);
assert_eq!(store.size_key(&"a/b".try_into()?)?, Some(4));
assert_eq!(store.size_key(&"a/c".try_into()?)?, Some(1));
assert_eq!(store.size_key(&"i/j/k".try_into()?)?, Some(2));
assert_eq!(
Expand All @@ -75,7 +80,7 @@ mod test_util {
ByteRange::FromEnd(0, Some(1))
]
)?,
Some(vec![vec![1].into(), vec![2].into()])
Some(vec![vec![1].into(), vec![3].into()])
);
assert_eq!(
store.get_partial_values(&[
Expand All @@ -84,8 +89,8 @@ mod test_util {
StoreKeyRange::new("i/j/k".try_into()?, ByteRange::FromStart(1, Some(1))),
])?,
vec![
Some(vec![1, 2, 3].into()),
Some(vec![1, 2].into()),
Some(vec![0, 1].into()),
Some(vec![1].into())
]
);
Expand All @@ -96,8 +101,8 @@ mod test_util {
),])
.is_err());

assert_eq!(store.size()?, 6);
assert_eq!(store.size_prefix(&"a/".try_into()?)?, 4);
assert_eq!(store.size()?, 7);
assert_eq!(store.size_prefix(&"a/".try_into()?)?, 5);
assert_eq!(store.size_prefix(&"i/".try_into()?)?, 2);

Ok(())
Expand Down
27 changes: 23 additions & 4 deletions src/storage/store/store_sync/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Mutex;
use crate::{
byte_range::{ByteOffset, ByteRange, InvalidByteRangeError},
storage::{
store_set_partial_values, Bytes, ListableStorageTraits, MaybeBytes, ReadableStorageTraits,
Bytes, ListableStorageTraits, MaybeBytes, ReadableStorageTraits,
ReadableWritableStorageTraits, StorageError, StoreKey, StoreKeyStartValue, StoreKeys,
StoreKeysPrefixes, StorePrefix, WritableStorageTraits,
},
Expand Down Expand Up @@ -49,7 +49,7 @@ impl MemoryStore {
// }
// }

fn set_impl(&self, key: &StoreKey, value: &[u8], offset: Option<ByteOffset>, _truncate: bool) {
fn set_impl(&self, key: &StoreKey, value: &[u8], offset: Option<ByteOffset>, truncate: bool) {
let mut data_map = self.data_map.lock().unwrap();
let data = data_map
.entry(key.clone())
Expand All @@ -66,7 +66,7 @@ impl MemoryStore {
let length = usize::try_from(offset + value.len() as u64).unwrap();
if data.len() < length {
data.resize(length, 0);
} else {
} else if truncate {
data.truncate(length);
}
let offset = usize::try_from(offset).unwrap();
Expand Down Expand Up @@ -134,7 +134,26 @@ impl WritableStorageTraits for MemoryStore {
&self,
key_start_values: &[StoreKeyStartValue],
) -> Result<(), StorageError> {
store_set_partial_values(self, key_start_values)
use itertools::Itertools;

// Group by key
key_start_values
.iter()
.chunk_by(|key_start_value| &key_start_value.key)
.into_iter()
.map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::<Vec<_>>()))
.try_for_each(|(key, group)| {
for key_start_value in group {
self.set_impl(
&key,
key_start_value.value,
Some(key_start_value.start),
false,
);
}
Ok::<_, StorageError>(())
})?;
Ok(())
}

fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
Expand Down

0 comments on commit 5473c8a

Please sign in to comment.