diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 4757af19..1f912c2b 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -431,7 +431,7 @@ pub async fn handle_emptyset( Some(change) => { if let Changeset::EmptySet { versions, ts } = change.changeset { buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts)); - cost += versions.len(); + cost += versions.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(); } else { warn!("received non-emptyset changes in emptyset channel from {}", change.actor_id); } @@ -451,12 +451,14 @@ pub async fn handle_emptyset( } if process { + for (actor, changes) in &mut buf { while !changes.is_empty() { let change = changes.pop_front().unwrap(); match process_emptyset(agent.clone(), bookie.clone(), *actor, &change).await { Ok(()) => { - cost -= change.0.len(); + // cost -= change.0.len(); + cost -= change.0.iter().map(|versions| cmp::min((versions.end().0 - versions.start().0) as usize + 1, 20)).sum::(); } Err(e) => { warn!("encountered error when processing emptyset - {e}"); @@ -467,6 +469,7 @@ pub async fn handle_emptyset( } } } + } println!("shutting down handle empties loop");