Skip to content

Commit

Permalink
[distributed store] use a single map Vec<u8> -> Counters map
Browse files Browse the repository at this point in the history
By keying using Vec<u8> we reduce how often we need to encode/decode the the counter keys.

Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
  • Loading branch information
chirino committed May 22, 2024
1 parent 0a70dff commit 3895c8a
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 233 deletions.
46 changes: 7 additions & 39 deletions limitador/src/storage/distributed/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::ops::Add;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error::Error, io::ErrorKind, pin::Pin};

use moka::sync::Cache;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::sleep;
Expand All @@ -14,15 +13,12 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Code, Request, Response, Status, Streaming};
use tracing::debug;

use crate::counter::Counter;
use crate::storage::distributed::cr_counter_value::CrCounterValue;
use crate::storage::distributed::grpc::v1::packet::Message;
use crate::storage::distributed::grpc::v1::replication_client::ReplicationClient;
use crate::storage::distributed::grpc::v1::replication_server::{Replication, ReplicationServer};
use crate::storage::distributed::grpc::v1::{
CounterUpdate, Hello, MembershipUpdate, Packet, Peer, Pong,
};
use crate::storage::distributed::CounterKey;

// clippy will barf on protobuff generated code for enum variants in
// v3::socket_option::SocketState, so allow this lint
Expand Down Expand Up @@ -187,34 +183,7 @@ impl Session {
}
Some(Message::CounterUpdate(update)) => {
debug!("peer: '{}': CounterUpdate", self.peer_id);

let counter_key = postcard::from_bytes::<CounterKey>(update.key.as_slice())
.map_err(|err| {
Status::internal(format!("failed to decode counter key: {:?}", err))
})?;

let values = BTreeMap::from_iter(
update
.values
.iter()
.map(|(k, v)| (k.to_owned(), v.to_owned())),
);

let counter = <CounterKey as Into<Counter>>::into(counter_key);
if counter.is_qualified() {
if let Some(counter) = self.broker_state.qualified_counters.get(&counter) {
counter.merge(
(UNIX_EPOCH + Duration::from_secs(update.expires_at), values).into(),
);
}
} else {
let counters = self.broker_state.limits_for_namespace.read().unwrap();
let limits = counters.get(counter.namespace()).unwrap();
let value = limits.get(counter.limit()).unwrap();
value.merge(
(UNIX_EPOCH + Duration::from_secs(update.expires_at), values).into(),
);
};
(self.broker_state.on_counter_update)(update);
}
_ => {
debug!("peer: '{}': unsupported packet: {:?}", self.peer_id, packet);
Expand Down Expand Up @@ -348,12 +317,13 @@ impl MessageSender {
}
}

type CounterUpdateFn = Pin<Box<dyn Fn(CounterUpdate) + Sync + Send>>;

#[derive(Clone)]
struct BrokerState {
id: String,
limits_for_namespace: Arc<std::sync::RwLock<super::LimitsMap>>,
qualified_counters: Arc<Cache<Counter, Arc<CrCounterValue<String>>>>,
publisher: broadcast::Sender<CounterUpdate>,
on_counter_update: Arc<CounterUpdateFn>,
}

#[derive(Clone)]
Expand All @@ -369,8 +339,7 @@ impl Broker {
id: String,
listen_address: SocketAddr,
peer_urls: Vec<String>,
limits_for_namespace: Arc<std::sync::RwLock<super::LimitsMap>>,
qualified_counters: Arc<Cache<Counter, Arc<CrCounterValue<String>>>>,
on_counter_update: CounterUpdateFn,
) -> Broker {
let (tx, _) = broadcast::channel(16);
let publisher: broadcast::Sender<CounterUpdate> = tx;
Expand All @@ -381,8 +350,7 @@ impl Broker {
broker_state: BrokerState {
id,
publisher,
limits_for_namespace,
qualified_counters,
on_counter_update: Arc::new(on_counter_update),
},
replication_state: Arc::new(RwLock::new(ReplicationState {
discovered_urls: HashSet::new(),
Expand Down
Loading

0 comments on commit 3895c8a

Please sign in to comment.