Skip to content

Commit

Permalink
Fix clippy warnings.
Browse files Browse the repository at this point in the history
Signed-off-by: Hiram Chirino <hiram@hiramchirino.com>
  • Loading branch information
chirino committed May 17, 2024
1 parent eb01b2a commit 65b43a0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
32 changes: 15 additions & 17 deletions limitador/src/storage/distributed/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use moka::sync::Cache;
use tokio::sync::mpsc::Sender;
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio::time::sleep;

use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Code, Request, Response, Status, Streaming};

Expand Down Expand Up @@ -40,7 +41,7 @@ enum ClockSkew {
impl ClockSkew {
fn new(local: SystemTime, remote: SystemTime) -> ClockSkew {
if local == remote {
return ClockSkew::None();
ClockSkew::None()
} else if local.gt(&remote) {
ClockSkew::Slow(local.duration_since(remote).unwrap())
} else {
Expand Down Expand Up @@ -88,11 +89,8 @@ struct Session {
impl Session {
async fn close(&mut self) {
let mut state = self.replication_state.write().await;
match state.peer_trackers.get_mut(&self.peer_id) {
Some(peer) => {
peer.session = None;
}
None => {}
if let Some(peer) = state.peer_trackers.get_mut(&self.peer_id) {
peer.session = None;
}
}

Expand Down Expand Up @@ -178,11 +176,11 @@ impl Session {
},
);
}
Some(peer_tracker) => {
peer.urls.clone().iter().for_each(|url| {
// TODO: add discovered urls to the existing tracker.
// peer_tracker.discovered_urls.insert(url.clone());
});
Some(_peer_tracker) => {
// // TODO: add discovered urls to the existing tracker.
// peer.urls.clone().iter().for_each(|url| {
// peer_tracker.discovered_urls.insert(url.clone());
// });
}
}
}
Expand Down Expand Up @@ -317,7 +315,7 @@ fn is_disconnect(err: &Status) -> bool {
return true;
}
}
return false;
false
}

// MessageSender is used to abstract the difference between the server and client sender streams...
Expand Down Expand Up @@ -456,7 +454,7 @@ impl Broker {

tonic::transport::Server::builder()
.add_service(ReplicationServer::new(self.clone()))
.serve(self.listen_address.clone())
.serve(self.listen_address)
.await
.unwrap();
}
Expand Down Expand Up @@ -502,7 +500,7 @@ impl Broker {
}

// Reconnect failed peers periodically
async fn reconnect_to_failed_peers(&self) -> () {
async fn reconnect_to_failed_peers(&self) {
let failed_peers: Vec<_> = {
let state = self.replication_state.read().await;
state
Expand Down Expand Up @@ -638,7 +636,7 @@ impl Broker {
.iter()
.map(String::to_owned)
.collect();
let mut tracker = PeerTracker {
let tracker = PeerTracker {
peer_id: peer_id.clone(),
url: None,
discovered_urls,
Expand All @@ -660,10 +658,10 @@ impl Broker {

// keep track of the URL we used to connect to the peer.
if peer_url.is_some() {
tracker.url = peer_url.clone()
tracker.url.clone_from(&peer_url)
}

return Ok(option);
Ok(option)
}
}

Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl CounterStorage for InMemoryStorage {
delta: u64,
load_counters: bool,
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.limits_for_namespace.write().unwrap();
let limits_by_namespace = self.limits_for_namespace.read().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
Expand Down

0 comments on commit 65b43a0

Please sign in to comment.