Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set_partial_values improvements #48

Merged
merged 2 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed
- **Breaking**: `Arc` instead of `Box` partial decoders
- Expand `set_partial_values` tests
- Specialise `set_partial_values` for `MemoryStore`

### Fixed
- `[async_]store_set_partial_values` no longer truncates
- this could corrupt values depending on the order of `set_partial_values` calls

## [0.16.3] - 2024-08-14

Expand Down
11 changes: 8 additions & 3 deletions src/storage/storage_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ pub trait AsyncListableStorageTraits: Send + Sync {

/// Set partial values for an asynchronous store.
///
/// This method reads entire values, updates them, and replaces them.
/// Stores can use this internally if they do not support updating/appending without replacement.
///
/// # Errors
/// Returns a [`StorageError`] if an underlying store operation fails.
///
Expand All @@ -179,6 +182,7 @@ pub trait AsyncListableStorageTraits: Send + Sync {
pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTraits>(
store: &T,
key_start_values: &[StoreKeyStartValue<'_>],
// truncate: bool
) -> Result<(), StorageError> {
let groups = key_start_values
.iter()
Expand All @@ -202,9 +206,10 @@ pub async fn async_store_set_partial_values<T: AsyncReadableWritableStorageTrait
usize::try_from(group.iter().map(StoreKeyStartValue::end).max().unwrap()).unwrap();
if vec.len() < end_max {
vec.resize_with(end_max, Default::default);
} else {
vec.truncate(end_max);
};
}
// else if truncate {
// vec.truncate(end_max);
// };

// Update the store key
for key_start_value in group {
Expand Down
12 changes: 9 additions & 3 deletions src/storage/storage_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ pub trait ListableStorageTraits: Send + Sync {

/// Set partial values for a store.
///
/// This method reads entire values, updates them, and replaces them.
/// Stores can use this internally if they do not support updating/appending without replacement.
///
/// # Errors
/// Returns a [`StorageError`] if an underlying store operation fails.
///
Expand All @@ -160,6 +163,7 @@ pub trait ListableStorageTraits: Send + Sync {
pub fn store_set_partial_values<T: ReadableWritableStorageTraits>(
store: &T,
key_start_values: &[StoreKeyStartValue],
// truncate: bool,
) -> Result<(), StorageError> {
// Group by key
key_start_values
Expand All @@ -184,10 +188,12 @@ pub fn store_set_partial_values<T: ReadableWritableStorageTraits>(
vec.extend_from_slice(&bytes);
vec.resize_with(end_max, Default::default);
vec
// } else if truncate {
// let mut bytes = bytes.to_vec();
// bytes.truncate(end_max);
// bytes
} else {
let mut bytes = bytes.to_vec();
bytes.truncate(end_max);
bytes
bytes.to_vec()
};

// Update the store key
Expand Down
24 changes: 16 additions & 8 deletions src/storage/store/store_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod test_util {

/// Create a store with the following data
/// - a/
/// - b [0, 1, 2]
/// - b [0, 1, 2, 3]
/// - c [0]
/// - d/
/// - e
Expand All @@ -33,10 +33,18 @@ mod test_util {
) -> Result<(), Box<dyn Error>> {
store.erase_prefix(&StorePrefix::root()).await?;

store.set(&"a/b".try_into()?, vec![0, 0, 0].into()).await?;
store
.set(&"a/b".try_into()?, vec![255, 255, 255].into())
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 1, &[1, 2])])
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 3, &[3])])
.await?;
store
.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 0, &[0])])
.await?;

store.set(&"a/c".try_into()?, vec![0].into()).await?;
store.set(&"a/d/e".try_into()?, vec![].into()).await?;
Expand Down Expand Up @@ -76,9 +84,9 @@ mod test_util {
assert!(store.size_key(&"notfound".try_into()?).await?.is_none());
assert_eq!(
store.get(&"a/b".try_into()?).await?,
Some(vec![0, 1, 2].into())
Some(vec![0, 1, 2, 3].into())
);
assert_eq!(store.size_key(&"a/b".try_into()?).await?, Some(3));
assert_eq!(store.size_key(&"a/b".try_into()?).await?, Some(4));
assert_eq!(store.size_key(&"a/c".try_into()?).await?, Some(1));
assert_eq!(store.size_key(&"i/j/k".try_into()?).await?, Some(2));
assert_eq!(
Expand All @@ -91,7 +99,7 @@ mod test_util {
]
)
.await?,
Some(vec![vec![1].into(), vec![2].into()])
Some(vec![vec![1].into(), vec![3].into()])
);
assert_eq!(
store
Expand All @@ -102,8 +110,8 @@ mod test_util {
])
.await?,
vec![
Some(vec![1, 2, 3].into()),
Some(vec![1, 2].into()),
Some(vec![0, 1].into()),
Some(vec![1].into())
]
);
Expand All @@ -115,8 +123,8 @@ mod test_util {
.await
.is_err());

assert_eq!(store.size().await?, 6);
assert_eq!(store.size_prefix(&"a/".try_into()?).await?, 4);
assert_eq!(store.size().await?, 7);
assert_eq!(store.size_prefix(&"a/".try_into()?).await?, 5);
assert_eq!(store.size_prefix(&"i/".try_into()?).await?, 2);

Ok(())
Expand Down
21 changes: 13 additions & 8 deletions src/storage/store/store_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod test_util {

/// Create a store with the following data
/// - a/
/// - b [0, 1, 2]
/// - b [0, 1, 2, 3]
/// - c [0]
/// - d/
/// - e
Expand All @@ -34,8 +34,10 @@ mod test_util {
pub fn store_write<T: WritableStorageTraits>(store: &T) -> Result<(), Box<dyn Error>> {
store.erase_prefix(&StorePrefix::root())?;

store.set(&"a/b".try_into()?, vec![0, 0, 0].into())?;
store.set(&"a/b".try_into()?, vec![255, 255, 255].into())?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 1, &[1, 2])])?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 3, &[3])])?;
store.set_partial_values(&[StoreKeyStartValue::new("a/b".try_into()?, 0, &[0])])?;

store.set(&"a/c".try_into()?, vec![0].into())?;
store.set(&"a/d/e".try_into()?, vec![].into())?;
Expand Down Expand Up @@ -63,8 +65,11 @@ mod test_util {
) -> Result<(), Box<dyn Error>> {
assert!(store.get(&"notfound".try_into()?)?.is_none());
assert!(store.size_key(&"notfound".try_into()?)?.is_none());
assert_eq!(store.get(&"a/b".try_into()?)?, Some(vec![0, 1, 2].into()));
assert_eq!(store.size_key(&"a/b".try_into()?)?, Some(3));
assert_eq!(
store.get(&"a/b".try_into()?)?,
Some(vec![0, 1, 2, 3].into())
);
assert_eq!(store.size_key(&"a/b".try_into()?)?, Some(4));
assert_eq!(store.size_key(&"a/c".try_into()?)?, Some(1));
assert_eq!(store.size_key(&"i/j/k".try_into()?)?, Some(2));
assert_eq!(
Expand All @@ -75,7 +80,7 @@ mod test_util {
ByteRange::FromEnd(0, Some(1))
]
)?,
Some(vec![vec![1].into(), vec![2].into()])
Some(vec![vec![1].into(), vec![3].into()])
);
assert_eq!(
store.get_partial_values(&[
Expand All @@ -84,8 +89,8 @@ mod test_util {
StoreKeyRange::new("i/j/k".try_into()?, ByteRange::FromStart(1, Some(1))),
])?,
vec![
Some(vec![1, 2, 3].into()),
Some(vec![1, 2].into()),
Some(vec![0, 1].into()),
Some(vec![1].into())
]
);
Expand All @@ -96,8 +101,8 @@ mod test_util {
),])
.is_err());

assert_eq!(store.size()?, 6);
assert_eq!(store.size_prefix(&"a/".try_into()?)?, 4);
assert_eq!(store.size()?, 7);
assert_eq!(store.size_prefix(&"a/".try_into()?)?, 5);
assert_eq!(store.size_prefix(&"i/".try_into()?)?, 2);

Ok(())
Expand Down
27 changes: 23 additions & 4 deletions src/storage/store/store_sync/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Mutex;
use crate::{
byte_range::{ByteOffset, ByteRange, InvalidByteRangeError},
storage::{
store_set_partial_values, Bytes, ListableStorageTraits, MaybeBytes, ReadableStorageTraits,
Bytes, ListableStorageTraits, MaybeBytes, ReadableStorageTraits,
ReadableWritableStorageTraits, StorageError, StoreKey, StoreKeyStartValue, StoreKeys,
StoreKeysPrefixes, StorePrefix, WritableStorageTraits,
},
Expand Down Expand Up @@ -49,7 +49,7 @@ impl MemoryStore {
// }
// }

fn set_impl(&self, key: &StoreKey, value: &[u8], offset: Option<ByteOffset>, _truncate: bool) {
fn set_impl(&self, key: &StoreKey, value: &[u8], offset: Option<ByteOffset>, truncate: bool) {
let mut data_map = self.data_map.lock().unwrap();
let data = data_map
.entry(key.clone())
Expand All @@ -66,7 +66,7 @@ impl MemoryStore {
let length = usize::try_from(offset + value.len() as u64).unwrap();
if data.len() < length {
data.resize(length, 0);
} else {
} else if truncate {
data.truncate(length);
}
let offset = usize::try_from(offset).unwrap();
Expand Down Expand Up @@ -134,7 +134,26 @@ impl WritableStorageTraits for MemoryStore {
&self,
key_start_values: &[StoreKeyStartValue],
) -> Result<(), StorageError> {
store_set_partial_values(self, key_start_values)
use itertools::Itertools;

// Group by key
key_start_values
.iter()
.chunk_by(|key_start_value| &key_start_value.key)
.into_iter()
.map(|(key, group)| (key.clone(), group.into_iter().cloned().collect::<Vec<_>>()))
.try_for_each(|(key, group)| {
for key_start_value in group {
self.set_impl(
&key,
key_start_value.value,
Some(key_start_value.start),
false,
);
}
Ok::<_, StorageError>(())
})?;
Ok(())
}

fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
Expand Down