Skip to content

Commit

Permalink
Merge pull request #223 from superfly/sync-cleared-versions
Browse files Browse the repository at this point in the history
Send empties during sync based on timestamp
  • Loading branch information
somtochiama committed Jul 5, 2024
2 parents cccfad8 + 4163219 commit 47ae385
Show file tree
Hide file tree
Showing 18 changed files with 1,068 additions and 250 deletions.
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use corro_types::{
sqlite::SqlitePoolError,
sync::{SyncMessageDecodeError, SyncMessageEncodeError},
};
use tokio::time::error::Elapsed;
use hyper::StatusCode;
use tokio::time::error::Elapsed;

#[derive(Debug, thiserror::Error)]
pub enum SyncClientError {
Expand Down
160 changes: 155 additions & 5 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
//! This module is _big_ and maybe should be split up further.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::RangeInclusive;

use std::{
cmp,
collections::VecDeque,
Expand All @@ -26,7 +29,11 @@ use corro_types::{
};

use bytes::Bytes;
use corro_types::agent::ChangeError;
use corro_types::base::Version;
use corro_types::broadcast::Timestamp;
use corro_types::change::store_empty_changeset;
use corro_types::sync::get_last_cleared_ts;
use foca::Notification;
use indexmap::IndexMap;
use metrics::{counter, gauge, histogram};
Expand Down Expand Up @@ -392,14 +399,141 @@ pub fn spawn_handle_db_cleanup(pool: SplitPool) {

// determine the estimated resource cost of processing a change
fn processing_cost(change: &Changeset) -> usize {
if change.is_empty() {
if change.is_empty() && !change.is_empty_set() {
let versions = change.versions();
cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)
} else {
change.len()
}
}

/// Handle incoming emptyset received during syncs
///
pub async fn handle_emptyset(
agent: Agent,
bookie: Bookie,
mut rx_emptysets: CorroReceiver<ChangeV1>,
mut tripwire: Tripwire,
) {
// maybe bigger timeout?
let mut max_wait = tokio::time::interval(Duration::from_millis(
agent.config().perf.apply_queue_timeout as u64,
));
let max_changes_chunk: usize = agent.config().perf.apply_queue_len;

let mut buf = HashMap::new();
let mut cost = 0;

loop {
let mut process = false;
tokio::select! {
maybe_change_src = rx_emptysets.recv() => match maybe_change_src {
Some(change) => {
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
cost += versions.len();
} else {
warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id);
}

if cost >= max_changes_chunk && !buf.is_empty() {
process = true;
}
},
None => break,
},
_ = max_wait.tick() => {
process = true
},
_ = &mut tripwire => {
break;
}
}

if process {
for (actor, changes) in &mut buf {
while !changes.is_empty() {
let change = changes.pop_front().unwrap();
match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await {
Ok(()) => {
cost -= change.0.len();
}
Err(e) => {
warn!("encountered error when processing emptyset - {e}");
changes.push_front(change);
break;
}
}
}
}
}
}

println!("shutting down handle empties loop");
}

pub async fn process_emptyset(
agent: Agent,
bookie: Bookie,
actor_id: ActorId,
changes: &(Vec<RangeInclusive<corro_types::base::Version>>, Timestamp),
) -> Result<(), ChangeError> {
let (versions, ts) = changes;
let mut conn = agent.pool().write_low().await?;

let booked = {
bookie
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}",
))
.await
.ensure(actor_id)
};
let mut booked_write = booked
.write(format!(
"process_emptyset(booked writer, updates timestamp):{actor_id}"
))
.await;
let mut snap = booked_write.snapshot();

let tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;

counter!("corro.sync.empties.count", "actor" => format!("{}", actor_id.to_string()))
.increment(versions.len() as u64);
debug!(self_actor_id = %agent.actor_id(), "processing emptyset changes, len: {}", versions.len());
for version in versions {
store_empty_changeset(&tx, actor_id, version.clone(), *ts)?;
}

snap.insert_db(&tx, RangeInclusiveSet::from_iter(versions.clone()))
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
snap.update_cleared_ts(&tx, *ts)
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;

tx.commit().map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;
booked_write.commit_snapshot(snap);

Ok(())
}

/// Bundle incoming changes to optimise transaction sizes with SQLite
///
/// *Performance tradeoff*: introduce latency (with a max timeout) to
Expand Down Expand Up @@ -658,8 +792,15 @@ pub async fn handle_sync(
**id != agent.actor_id() && state.cluster_id == agent.cluster_id()
})
// Grab a ring-buffer index to the member RTT range
.map(|(id, state)| (*id, state.ring.unwrap_or(255), state.addr))
.collect::<Vec<(ActorId, u8, SocketAddr)>>()
.map(|(id, state)| {
(
*id,
state.ring.unwrap_or(255),
state.addr,
state.last_sync_ts,
)
})
.collect::<Vec<(ActorId, u8, SocketAddr, Option<Timestamp>)>>()
};

if candidates.is_empty() {
Expand All @@ -682,14 +823,16 @@ pub async fn handle_sync(
sync_state
.need_len_for_actor(&b.0)
.cmp(&sync_state.need_len_for_actor(&a.0))
// if equal, look at last sync time
.then_with(|| a.3.cmp(&b.3))
// if equal, look at proximity (via `ring`)
.then_with(|| a.1.cmp(&b.1))
});

choices.truncate(desired_count);
choices
.into_iter()
.map(|(actor_id, _, addr)| (actor_id, addr))
.map(|(actor_id, _, addr, _)| (actor_id, addr))
.collect()
};

Expand All @@ -698,8 +841,14 @@ pub async fn handle_sync(
return Ok(());
}

let mut last_cleared: HashMap<ActorId, Option<Timestamp>> = HashMap::new();

for (actor_id, _) in chosen.clone() {
last_cleared.insert(actor_id, get_last_cleared_ts(&bookie, &actor_id).await);
}

let start = Instant::now();
let n = match parallel_sync(agent, transport, chosen.clone(), sync_state).await {
let n = match parallel_sync(agent, transport, chosen.clone(), sync_state, last_cleared).await {
Ok(n) => n,
Err(e) => {
error!("failed to execute parallel sync: {e:?}");
Expand All @@ -712,6 +861,7 @@ pub async fn handle_sync(
info!(
"synced {n} changes w/ {} in {}s @ {} changes/s",
chosen
.clone()
.into_iter()
.map(|(actor_id, _)| actor_id.to_string())
.collect::<Vec<_>>()
Expand Down
8 changes: 8 additions & 0 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
rx_apply,
rx_clear_buf,
rx_changes,
rx_emptyset,
rx_foca,
subs_manager,
subs_bcast_cache,
Expand Down Expand Up @@ -217,5 +218,12 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
tripwire.clone(),
));

spawn_counted(handlers::handle_emptyset(
agent.clone(),
bookie.clone(),
rx_emptyset,
tripwire.clone(),
));

Ok(bookie)
}
23 changes: 14 additions & 9 deletions crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct AgentOptions {
pub rx_apply: CorroReceiver<(ActorId, Version)>,
pub rx_clear_buf: CorroReceiver<(ActorId, RangeInclusive<Version>)>,
pub rx_changes: CorroReceiver<(ChangeV1, ChangeSource)>,
pub rx_emptyset: CorroReceiver<ChangeV1>,
pub rx_foca: CorroReceiver<FocaInput>,
pub rtt_rx: TokioReceiver<(SocketAddr, Duration)>,
pub subs_manager: SubsManager,
Expand Down Expand Up @@ -84,9 +85,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age

let pool = SplitPool::create(&conf.db.path, write_sema.clone()).await?;

let clock = Arc::new(
uhlc::HLCBuilder::default()
.with_id(actor_id.try_into().unwrap())
.with_max_delta(Duration::from_millis(300))
.build(),
);

let schema = {
let mut conn = pool.write_priority().await?;
migrate(&mut conn)?;
migrate(clock.clone(), &mut conn)?;
let mut schema = init_schema(&conn)?;
schema.constrain()?;

Expand Down Expand Up @@ -138,15 +146,9 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
}
let api_addr = api_listeners.first().unwrap().local_addr()?;

let clock = Arc::new(
uhlc::HLCBuilder::default()
.with_id(actor_id.try_into().unwrap())
.with_max_delta(Duration::from_millis(300))
.build(),
);

let (tx_bcast, rx_bcast) = bounded(conf.perf.bcast_channel_len, "bcast");
let (tx_changes, rx_changes) = bounded(conf.perf.changes_channel_len, "changes");
let (tx_emptyset, rx_emptyset) = bounded(conf.perf.changes_channel_len, "emptyset");
let (tx_foca, rx_foca) = bounded(conf.perf.foca_channel_len, "foca");

let lock_registry = LockRegistry::default();
Expand All @@ -162,7 +164,8 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
async move {
let conn = pool.read().await?;
*booked.deref_mut().deref_mut() =
tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id))?;
tokio::task::block_in_place(|| BookedVersions::from_conn(&conn, actor_id))
.expect("loading BookedVersions from db failed");
Ok::<_, eyre::Report>(())
}
});
Expand All @@ -176,6 +179,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
rx_apply,
rx_clear_buf,
rx_changes,
rx_emptyset,
rx_foca,
rtt_rx,
subs_manager: subs_manager.clone(),
Expand All @@ -197,6 +201,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
tx_apply,
tx_clear_buf,
tx_changes,
tx_emptyset,
tx_foca,
write_sema,
schema: RwLock::new(schema),
Expand Down
Loading

0 comments on commit 47ae385

Please sign in to comment.