Skip to content

Commit

Permalink
feat: Implement conflict reconciliation for private directories
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Apr 23, 2024
1 parent a0bf23c commit b6475c5
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 35 deletions.
122 changes: 116 additions & 6 deletions wnfs/src/private/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use libipld_core::cid::Cid;
use rand_core::CryptoRngCore;
use std::{
collections::{BTreeMap, BTreeSet},
collections::{btree_map::Entry, BTreeMap, BTreeSet},
fmt::Debug,
};
use wnfs_common::{
Expand Down Expand Up @@ -44,7 +44,7 @@ pub type PrivatePathNodesResult = PathNodesResult<PrivateDirectory>;
/// ```
#[derive(Debug, Clone, PartialEq)]
pub struct PrivateDirectory {
pub header: PrivateNodeHeader,
pub(crate) header: PrivateNodeHeader,
pub(crate) content: PrivateDirectoryContent,
}

Expand Down Expand Up @@ -260,7 +260,7 @@ impl PrivateDirectory {
.resolve_node(forest, store, Some(self.header.name.clone()))
.await?;
if search_latest {
Some(private_node.search_latest(forest, store).await?)
Some(private_node.search_latest_reconciled(forest, store).await?)
} else {
Some(private_node.clone())
}
Expand All @@ -283,7 +283,7 @@ impl PrivateDirectory {
.resolve_node_mut(forest, store, Some(self.header.name.clone()))
.await?;
if search_latest {
*private_node = private_node.search_latest(forest, store).await?;
*private_node = private_node.search_latest_reconciled(forest, store).await?;
}

Some(private_node)
Expand All @@ -302,7 +302,7 @@ impl PrivateDirectory {
let mut working_dir = Arc::clone(self);

if search_latest {
working_dir = working_dir.search_latest(forest, store).await?;
working_dir = working_dir.search_latest_reconciled(forest, store).await?;
}

for (depth, segment) in path_segments.iter().enumerate() {
Expand All @@ -329,7 +329,7 @@ impl PrivateDirectory {
store: &impl BlockStore,
) -> Result<SearchResult<&'a mut Self>> {
if search_latest {
*self = self.clone().search_latest(forest, store).await?;
*self = self.clone().search_latest_reconciled(forest, store).await?;
}

let mut working_dir = self.prepare_next_revision()?;
Expand Down Expand Up @@ -422,6 +422,44 @@ impl PrivateDirectory {
Ok(cloned)
}

/// TODO(matheus23): DOCS
pub(crate) fn prepare_next_merge<'a>(
self: &'a mut Arc<Self>,
current_cid: Cid,
target_header: PrivateNodeHeader,
) -> Result<&'a mut Self> {
let ratchet_diff = target_header.ratchet_diff_for_merge(&self.header)?;

if self.content.previous.len() > 1 {
// This is a merge node
let cloned = Arc::make_mut(self);
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = std::mem::take(&mut cloned.content.previous)
.into_iter()
.map(|(ratchet_steps, link)| (ratchet_steps + ratchet_diff, link))
.collect();

return Ok(cloned);
}

// It's not a merge node, we need to advance the revision

let temporal_key = self.header.derive_temporal_key();
let previous_link = (
ratchet_diff,
Encrypted::from_value(current_cid, &temporal_key)?,
);
let cloned = Arc::make_mut(self);

// We make sure to clear any cached states.
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = [previous_link].into_iter().collect();

Ok(cloned)
}

/// This prepares this directory for key rotation, usually for moving or
/// copying the directory to some other place.
///
Expand Down Expand Up @@ -802,6 +840,18 @@ impl PrivateDirectory {
.as_dir()
}

/// TODO(matheus23): DOCS
pub async fn search_latest_reconciled(
self: Arc<Self>,
forest: &impl PrivateForest,
store: &impl BlockStore,
) -> Result<Arc<Self>> {
PrivateNode::Dir(self)
.search_latest_reconciled(forest, store)
.await?
.as_dir()
}

/// Creates a new directory at the specified path.
///
/// # Examples
Expand Down Expand Up @@ -1307,6 +1357,66 @@ impl PrivateDirectory {
pub fn as_node(self: &Arc<Self>) -> PrivateNode {
PrivateNode::Dir(Arc::clone(self))
}

/// TODO(matheus23): DOCS
pub(crate) fn merge(
self: &mut Arc<Self>,
target_header: PrivateNodeHeader,
our_cid: Cid,
other: &Arc<Self>,
other_cid: Cid,
) -> Result<()> {
if our_cid == other_cid {
return Ok(());
}

let other_ratchet_diff = target_header.ratchet_diff_for_merge(&other.header)?;

let our = self.prepare_next_merge(our_cid, target_header)?;

if our.content.previous.len() > 1 {
// This is a merge node. We'll just add its previous links.
our.content.previous.extend(
other
.content
.previous
.iter()
.cloned()
.map(|(rev_back, link)| (rev_back + other_ratchet_diff, link)),
);
} else {
// The other node represents a write - we need to store a link to its CID
let temporal_key = &other.header.derive_temporal_key();
our.content.previous.insert((
other_ratchet_diff,
Encrypted::from_value(other_cid, temporal_key)?,
));
}

our.content
.metadata
.tie_break_with(&other.content.metadata)?;

for (name, other_link) in other.content.entries.iter() {
match our.content.entries.entry(name.clone()) {
Entry::Vacant(vacant) => {
vacant.insert(other_link.clone());
}
Entry::Occupied(mut occupied) => {
let our_link = occupied.get_mut();
// We just tie-break on the content cid.
// It's assumed both links have been resolved to their
// PrivateRef before, and we can tie-break on their content_cid.
// Otherwise, how would we have gotten `our_cid` and `other_cid`
// in this context? Both of these were gotten from `.store()`ing the
// nodes, which includes resolving the children to `PrivateRef`s.
our_link.tie_break_with(other_link)?;
}
}
}

Ok(())
}
}

impl PrivateDirectoryContent {
Expand Down
103 changes: 102 additions & 1 deletion wnfs/src/private/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libipld_core::{
};
use rand_core::CryptoRngCore;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeSet, iter};
use std::{cmp::Ordering, collections::BTreeSet, iter};
use wnfs_common::{
utils::{self, Arc, BoxStream},
BlockStore, Metadata, CODEC_RAW, MAX_BLOCK_SIZE,
Expand Down Expand Up @@ -752,6 +752,44 @@ impl PrivateFile {
Ok(cloned)
}

/// TODO(matheus23): DOCS
pub(crate) fn prepare_next_merge<'a>(
self: &'a mut Arc<Self>,
current_cid: Cid,
target_header: PrivateNodeHeader,
) -> Result<&'a mut Self> {
let ratchet_diff = target_header.ratchet_diff_for_merge(&self.header)?;

if self.content.previous.len() > 1 {
// This is a merge node
let cloned = Arc::make_mut(self);
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = std::mem::take(&mut cloned.content.previous)
.into_iter()
.map(|(ratchet_steps, link)| (ratchet_steps + ratchet_diff, link))
.collect();

return Ok(cloned);
}

// It's not a merge node, we need to advance the revision

let temporal_key = self.header.derive_temporal_key();
let previous_link = (
ratchet_diff,
Encrypted::from_value(current_cid, &temporal_key)?,
);
let cloned = Arc::make_mut(self);

// We make sure to clear any cached states.
cloned.content.persisted_as = OnceCell::new();
cloned.header = target_header;
cloned.content.previous = [previous_link].into_iter().collect();

Ok(cloned)
}

/// This prepares this file for key rotation, usually for moving or
/// copying the file to some other place.
///
Expand Down Expand Up @@ -835,6 +873,62 @@ impl PrivateFile {
pub fn as_node(self: &Arc<Self>) -> PrivateNode {
PrivateNode::File(Arc::clone(self))
}

/// TODO(matheus23): DOCS
pub(crate) fn merge(
self: &mut Arc<Self>,
target_header: PrivateNodeHeader,
our_cid: Cid,
other: &Arc<Self>,
other_cid: Cid,
) -> Result<()> {
if our_cid == other_cid {
return Ok(());
}

let other_ratchet_diff = target_header.ratchet_diff_for_merge(&other.header)?;

let our = self.prepare_next_merge(our_cid, target_header)?;

if our.content.previous.len() > 1 {
// This is a merge node. We'll just add its previous links.
our.content.previous.extend(
other
.content
.previous
.iter()
.cloned()
.map(|(rev_back, link)| (rev_back + other_ratchet_diff, link)),
);
} else {
// The other node represents a write - we need to store a link to its CID
let temporal_key = &other.header.derive_temporal_key();
our.content.previous.insert((
other_ratchet_diff,
Encrypted::from_value(other_cid, temporal_key)?,
));
}

let our_hash = our.content.content.crdt_tiebreaker()?;
let other_hash = other.content.content.crdt_tiebreaker()?;

match our_hash.cmp(&other_hash) {
Ordering::Greater => {
our.content.content.clone_from(&other.content.content);
our.content.metadata.clone_from(&other.content.metadata);
}
Ordering::Equal => {
our.content
.metadata
.tie_break_with(&other.content.metadata)?;
}
Ordering::Less => {
// we take ours
}
}

Ok(())
}
}

impl PrivateFileContent {
Expand Down Expand Up @@ -877,6 +971,13 @@ impl PrivateFileContent {
}
}

impl FileContent {
pub(crate) fn crdt_tiebreaker(&self) -> Result<[u8; 32]> {
let bytes = serde_ipld_dagcbor::to_vec(self)?;
Ok(blake3::hash(&bytes).into())
}
}

impl PrivateForestContent {
/// Take some plaintext to encrypt and store in given private forest.
///
Expand Down
4 changes: 2 additions & 2 deletions wnfs/src/private/forest/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub trait PrivateForest: CondSync {
temporal_key: &'a TemporalKey,
store: &'a impl BlockStore,
parent_name: Option<Name>,
) -> BoxStream<'a, Result<PrivateNode>>
) -> BoxStream<'a, Result<(Cid, PrivateNode)>>
where
Self: Sized,
{
Expand All @@ -152,7 +152,7 @@ pub trait PrivateForest: CondSync {
Ok(Some(cids)) => {
for cid in cids {
match PrivateNode::from_cid(*cid, temporal_key, self, store, parent_name.clone()).await {
Ok(node) => yield Ok(node),
Ok(node) => yield Ok((*cid, node)),
Err(e) if e.downcast_ref::<CryptError>().is_some() => {
// we likely matched a PrivateNodeHeader instead of a PrivateNode.
// we skip it
Expand Down
25 changes: 24 additions & 1 deletion wnfs/src/private/link.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::{
forest::traits::PrivateForest, PrivateDirectory, PrivateFile, PrivateNode, PrivateRef,
};
use anyhow::Result;
use anyhow::{anyhow, Result};
use async_once_cell::OnceCell;
use async_recursion::async_recursion;
use libipld_core::{cid::Cid, multihash::MultihashGeneric};
use rand_core::CryptoRngCore;
use wnfs_common::{
utils::{Arc, CondSend},
Expand Down Expand Up @@ -126,6 +127,13 @@ impl PrivateLink {
}
}

pub fn get_content_cid(&self) -> Option<&Cid> {
match self {
Self::Encrypted { private_ref, .. } => Some(&private_ref.content_cid),
Self::Decrypted { node } => node.get_persisted_as().get(),
}
}

/// Creates a link to a directory node.
#[inline]
pub(crate) fn with_dir(dir: PrivateDirectory) -> Self {
Expand All @@ -137,6 +145,21 @@ impl PrivateLink {
pub(crate) fn with_file(file: PrivateFile) -> Self {
Self::from(PrivateNode::File(Arc::new(file)))
}

pub(crate) fn crdt_tiebreaker(&self) -> Result<MultihashGeneric<64>> {
Ok(*self.get_content_cid().ok_or_else(|| anyhow!("Impossible case: CRDT tiebreaker needed on node wasn't persisted before tie breaking"))?.hash())
}

pub(crate) fn tie_break_with(&mut self, other_link: &PrivateLink) -> Result<()> {
let our_hash = self.crdt_tiebreaker()?;
let other_hash = other_link.crdt_tiebreaker()?;

if other_hash.digest() < our_hash.digest() {
*self = other_link.clone();
}

Ok(())
}
}

impl PartialEq for PrivateLink {
Expand Down
Loading

0 comments on commit b6475c5

Please sign in to comment.