Skip to content

Commit

Permalink
[distributed storage] support re-syncing counters after a peer is par…
Browse files Browse the repository at this point in the history
…titioned.

Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
  • Loading branch information
chirino committed May 23, 2024
1 parent 8ce12c8 commit db635a9
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 16 deletions.
20 changes: 12 additions & 8 deletions limitador/proto/distributed.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ package limitador.service.distributed.v1;
// A packet defines all the types of messages that can be sent between replication peers.
message Packet {
oneof message {
// the Hello message is used to introduce a peer to another peer. It is the first message sent by a peer.
// the hello message is used to introduce a peer to another peer. It is the first message sent by a peer.
Hello hello = 1;
// the MembershipUpdate message is used to gossip about the other peers in the cluster:
// the membership_update message is used to gossip about the other peers in the cluster:
// 1) sent after the first Hello message
// 2) sent when the membership state changes
MembershipUpdate membership_update = 2;
// the Ping message is used to request a pong from the other peer.
Ping ping = 3;
// the Pong message is used to respond to a ping.
// the ping message is used to request a pong from the other peer.
Empty ping = 3;
// the pong message is used to respond to a ping.
Pong pong = 4;
// the CounterUpdate message is used to send counter updates.
// the counter_update message is used to send counter updates.
CounterUpdate counter_update = 5;
// the re_sync_request message is used to request the peer send counter_update for all known counters.
Empty re_sync_request = 6;
// the re_sync_end message is used to signal the end of a re_sync_request.
Empty re_sync_end = 7;
}
}

Expand All @@ -30,8 +34,8 @@ message Hello {
optional string receiver_url = 3;
}

// A request to a peer to respond with a Pong message.
message Ping {}
// A packet message that does not have any additional data.
message Empty {}

// Pong is the response to a Ping and Hello message.
message Pong {
Expand Down
103 changes: 95 additions & 8 deletions limitador/src/storage/distributed/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error::Error, io::ErrorKind, pin::Pin};

use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::sleep;

use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Code, Request, Response, Status, Streaming};
use tracing::debug;
Expand All @@ -17,7 +17,7 @@ use crate::storage::distributed::grpc::v1::packet::Message;
use crate::storage::distributed::grpc::v1::replication_client::ReplicationClient;
use crate::storage::distributed::grpc::v1::replication_server::{Replication, ReplicationServer};
use crate::storage::distributed::grpc::v1::{
CounterUpdate, Hello, MembershipUpdate, Packet, Peer, Pong,
CounterUpdate, Empty, Hello, MembershipUpdate, Packet, Peer, Pong,
};

// clippy will barf on protobuff generated code for enum variants in
Expand Down Expand Up @@ -84,16 +84,30 @@ struct Session {

impl Session {
async fn close(&mut self) {
let mut state = self.replication_state.write().await;
if let Some(peer) = state.peer_trackers.get_mut(&self.peer_id) {
let mut replication_state = self.replication_state.write().await;
if let Some(peer) = replication_state.peer_trackers.get_mut(&self.peer_id) {
peer.session = None;
}

// resync once we are no longer connected to any peers.
if replication_state.active_session_count() == 0 {
replication_state.re_sync_needed = true
}
}

async fn send(&self, message: Message) -> Result<(), Status> {
self.out_stream.clone().send(Ok(message)).await
}

async fn re_sync_check(&self) -> Result<(), Status> {
let mut replication_state = self.replication_state.write().await;
if replication_state.re_sync_needed && replication_state.re_sync_peer.is_none() {
replication_state.re_sync_peer = Some(self.peer_id.clone());
self.send(Message::ReSyncRequest(Empty::default())).await?;
}
Ok(())
}

async fn process(&mut self, in_stream: &mut Streaming<Packet>) -> Result<(), Status> {
// Send a MembershipUpdate to inform the peer about all the members
// We should resend it again if we learn of new members.
Expand All @@ -105,11 +119,13 @@ impl Session {
}))
.await?;

let mut udpates_to_send = self.broker_state.publisher.subscribe();
// We may need to initiate a re-sync if we have been partitioned from the cluster.
self.re_sync_check().await?;

let mut updates = self.broker_state.publisher.subscribe();
loop {
tokio::select! {
update = udpates_to_send.recv() => {
update = updates.recv() => {
let update = update.map_err(|_| Status::unknown("broadcast error"))?;
self.send(Message::CounterUpdate(update)).await?;
}
Expand Down Expand Up @@ -150,6 +166,59 @@ impl Session {
})))
.await?;
}
Some(Message::ReSyncRequest(_)) => {
debug!("peer: '{}': ReSyncRequest", self.peer_id);
let (tx, mut rx) = mpsc::channel::<Option<CounterUpdate>>(1);
let peer_id = self.peer_id.clone();
let out_stream = self.out_stream.clone();
tokio::spawn(async move {
let mut counter = 0u64;
while let Some(rsync_message) = rx.recv().await {
match rsync_message {
Some(update) => {
counter += 1;
if let Err(err) = out_stream
.clone()
.send(Ok(Message::CounterUpdate(update)))
.await
{
debug!(
"peer: '{}': ReSyncRequest: send error: {:?}",
peer_id, err
);
return;
}
}
None => {
debug!(
"peer: '{}': rysnc completed, sent %d updates: {:?}",
peer_id, counter
);
_ = out_stream
.clone()
.send(Ok(Message::ReSyncEnd(Empty::default())))
.await;
}
}
}
});
self.broker_state
.on_re_sync
.try_send(tx)
.map_err(|err| match err {
TrySendError::Full(_) => Status::resource_exhausted("re-sync channel full"),
TrySendError::Closed(_) => Status::unavailable("re-sync channel closed"),
})?;
}
Some(Message::ReSyncEnd(_)) => {
debug!("peer: '{}': ReSyncEnd", self.peer_id);
// peer has finished re-syncing us
{
let mut replication_state = self.replication_state.write().await;
replication_state.re_sync_needed = false;
replication_state.re_sync_peer = None;
}
}
Some(Message::MembershipUpdate(update)) => {
debug!("peer: '{}': MembershipUpdate", self.peer_id);
// add any new peers to peer_trackers
Expand Down Expand Up @@ -214,6 +283,12 @@ struct ReplicationState {
// URLs our peers have used to connect to us.
discovered_urls: HashSet<String>,
peer_trackers: HashMap<String, PeerTracker>,

// if this peer has been partitioned from the cluster this should be set to true
// to signal that a re-sync should be started with one of the peers.
re_sync_needed: bool,
// This is set to the peer that a re-sync is in progress with.
re_sync_peer: Option<String>,
}

impl ReplicationState {
Expand All @@ -233,6 +308,13 @@ impl ReplicationState {
peers.sort_by(|a, b| a.peer_id.cmp(&b.peer_id));
peers
}

fn active_session_count(&self) -> usize {
self.peer_trackers
.iter()
.filter(|(_, peer_tracker)| peer_tracker.session.is_some())
.count()
}
}

fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> {
Expand Down Expand Up @@ -289,13 +371,13 @@ fn is_disconnect(err: &Status) -> bool {

// MessageSender is used to abstract the difference between the server and client sender streams...
#[derive(Clone)]
enum MessageSender {
pub enum MessageSender {
Server(Sender<Result<Packet, Status>>),
Client(Sender<Packet>),
}

impl MessageSender {
async fn send(self, message: Result<Message, Status>) -> Result<(), Status> {
pub async fn send(self, message: Result<Message, Status>) -> Result<(), Status> {
match self {
MessageSender::Server(sender) => {
let value = message.map(|x| Packet { message: Some(x) });
Expand Down Expand Up @@ -324,6 +406,7 @@ struct BrokerState {
id: String,
publisher: broadcast::Sender<CounterUpdate>,
on_counter_update: Arc<CounterUpdateFn>,
on_re_sync: Arc<Sender<Sender<Option<CounterUpdate>>>>,
}

#[derive(Clone)]
Expand All @@ -340,6 +423,7 @@ impl Broker {
listen_address: SocketAddr,
peer_urls: Vec<String>,
on_counter_update: CounterUpdateFn,
on_re_sync: Sender<Sender<Option<CounterUpdate>>>,
) -> Broker {
let (tx, _) = broadcast::channel(16);
let publisher: broadcast::Sender<CounterUpdate> = tx;
Expand All @@ -351,10 +435,13 @@ impl Broker {
id,
publisher,
on_counter_update: Arc::new(on_counter_update),
on_re_sync: Arc::new(on_re_sync),
},
replication_state: Arc::new(RwLock::new(ReplicationState {
discovered_urls: HashSet::new(),
peer_trackers: HashMap::new(),
re_sync_needed: true,
re_sync_peer: None,
})),
}
}
Expand Down
55 changes: 55 additions & 0 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::debug;

use crate::counter::Counter;
use crate::limit::{Limit, Namespace};
Expand Down Expand Up @@ -217,6 +220,8 @@ impl CrInMemoryStorage {
let limits = Arc::new(RwLock::new(LimitsMap::new()));

let limits_clone = limits.clone();

let (re_sync_queue_tx, mut re_sync_queue_rx) = mpsc::channel(100);
let broker = grpc::Broker::new(
identifier.clone(),
listen_address,
Expand All @@ -232,6 +237,7 @@ impl CrInMemoryStorage {
let value = limits.get(&update.key).unwrap();
value.merge((UNIX_EPOCH + Duration::from_secs(update.expires_at), values).into());
}),
re_sync_queue_tx,
);

{
Expand All @@ -241,6 +247,17 @@ impl CrInMemoryStorage {
});
}

// process the re-sync requests...
{
let limits = limits.clone();
tokio::spawn(async move {
let limits = limits.clone();
while let Some(sender) = re_sync_queue_rx.recv().await {
process_re_sync(&limits, sender).await;
}
});
}

Self {
identifier,
limits,
Expand Down Expand Up @@ -279,6 +296,44 @@ impl CrInMemoryStorage {
}
}

async fn process_re_sync(
limits: &Arc<RwLock<HashMap<Vec<u8>, CrCounterValue<String>>>>,
sender: Sender<Option<CounterUpdate>>,
) {
// sending all the counters to the peer might take a while, so we don't want to lock
// the limits map for too long, lets figure first get the list of keys that needs to be sent.
let keys: Vec<_> = {
let limits = limits.read().unwrap();
limits.keys().cloned().collect()
};

for key in keys {
let update = {
let limits = limits.read().unwrap();
limits.get(&key).map(|store_value| {
let (expiry, values) = store_value.clone().into_inner();
CounterUpdate {
key: key.clone(),
values: values.into_iter().collect(),
expires_at: expiry.duration_since(UNIX_EPOCH).unwrap().as_secs(),
}
})
};
// skip None, it means the counter was deleted.
if let Some(update) = update {
match sender.send(Some(update)).await {
Ok(_) => {}
Err(err) => {
debug!("Failed to send re-sync counter update to peer: {:?}", err);
break;
}
}
}
}
// signal the end of the re-sync
_ = sender.send(None).await;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
struct CounterKey {
namespace: Namespace,
Expand Down

0 comments on commit db635a9

Please sign in to comment.