Skip to content

Commit

Permalink
Searching for a problem
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Feb 9, 2024
1 parent ad4ccf1 commit e33d7a0
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 30 deletions.
29 changes: 18 additions & 11 deletions crates/erc20_rpc_pool/src/rpc_pool/eth_generic_call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,31 @@ impl Web3RpcPool {
let idx_vec = self.clone().choose_best_endpoints().await;

if idx_vec.is_empty() {
if let Some(event_sender) = self.event_sender.clone().and_then(|es| es.upgrade()) {
let _ = event_sender
.send(DriverEvent {
create_date: chrono::Utc::now(),
content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
chain_id: self.chain_id,
content: Web3RpcPoolContent::AllEndpointsFailed,
}),
})
.await;
}
if loop_no >= LOOP_COUNT {
if let Some(event_sender) =
self.event_sender.clone().and_then(|es| es.upgrade())
{
let _ = event_sender
.send(DriverEvent {
create_date: chrono::Utc::now(),
content: DriverEventContent::Web3RpcMessage(Web3RpcPoolInfo {
chain_id: self.chain_id,
content: Web3RpcPoolContent::AllEndpointsFailed,
}),
})
.await;
}
log::warn!(
"Seems like all RPC endpoints failed - chain id: {}",
self.chain_id
);
return Err(web3::Error::Unreachable);
}
// sleep for 800, 1200, 2000, 2800 ms - total max sleep time is 6800 ms
let sleep_times: [u64; LOOP_COUNT] = [800, 1200, 2000, 2800];
tokio::time::sleep(Duration::from_millis(sleep_times[loop_no])).await;
loop_no += 1;
continue;
}

for idx in idx_vec {
Expand Down
3 changes: 1 addition & 2 deletions crates/erc20_rpc_pool/src/rpc_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Web3ExternalDnsSource {
pub endpoint_params: Web3EndpointParams,
}

#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Web3RpcEndpoint {
#[serde(skip)]
Expand Down Expand Up @@ -373,7 +373,6 @@ impl Web3RpcPool {
allowed_endpoints
} else {
let self_cloned = self.clone();
log::error!("Spawn tasks");
self_cloned
.endpoint_verifier
.clone()
Expand Down
1 change: 1 addition & 0 deletions crates/erc20_rpc_pool/src/rpc_pool/pool/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl ExternalSourceResolver {
});
}
async fn resolve_external_addresses_int(self: Arc<Self>, pool: Arc<Web3RpcPool>) {
metrics::counter!("resolver_spawned", 1, "chain_id" => pool.chain_id.to_string());
pool.cleanup_sources_after_grace_period();

let dns_jobs = &pool.external_dns_sources;
Expand Down
2 changes: 1 addition & 1 deletion crates/erc20_rpc_pool/src/rpc_pool/pool/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl EndpointsVerifier {
let pool = pool.clone();
let self_cloned = self.clone();
let h = tokio::spawn(async move {
log::error!("Starting external resolver");
self_cloned
.clone()
.verify_unverified_endpoints(pool.clone())
Expand All @@ -73,6 +72,7 @@ impl EndpointsVerifier {
}

async fn verify_unverified_endpoints(self: Arc<EndpointsVerifier>, pool: Arc<Web3RpcPool>) {
metrics::counter!("verifier_spawned", 1, "chain_id" => pool.chain_id.to_string());
let _guard = pool.verify_mutex.lock().await;
let futures = {
let endpoints_copy = pool
Expand Down
23 changes: 7 additions & 16 deletions crates/erc20_rpc_pool/src/rpc_pool/verify/verify_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ async fn verify_endpoint_int(web3: &Web3<Http>, vep: VerifyEndpointParams) -> Ve
};
if let Some(max_head_behind_secs) = vep.allow_max_head_behind_secs {
if Utc::now() - date > Duration::seconds(max_head_behind_secs as i64) {
log::warn!("Verify endpoint error - Head behind");
return VerifyEndpointResult::HeadBehind(date);
}
}
Expand All @@ -66,23 +67,13 @@ async fn verify_endpoint_int(web3: &Web3<Http>, vep: VerifyEndpointParams) -> Ve
}

pub async fn verify_endpoint(chain_id: u64, m: Arc<RwLock<Web3RpcEndpoint>>) {
let (web3, web3_rpc_info, web3_rpc_params) = {
(
m.try_read_for(std::time::Duration::from_secs(5))
.unwrap()
.web3
.clone(),
m.try_read_for(std::time::Duration::from_secs(5))
.unwrap()
.web3_rpc_info
.clone(),
m.try_read_for(std::time::Duration::from_secs(5))
.unwrap()
.web3_rpc_params
.clone(),
)
};
let (web3, web3_rpc_info, web3_rpc_params) = m
.try_read_for(std::time::Duration::from_secs(5))
.map(|x| x.clone())
.map(|x| (x.web3, x.web3_rpc_info, x.web3_rpc_params))
.unwrap();

log::error!("Verify endpoint {:?}", web3_rpc_params.web3_endpoint_params.max_head_behind_secs);
if let Some(last_verified) = web3_rpc_info.last_verified {
if Utc::now() - last_verified
< Duration::seconds(web3_rpc_params.web3_endpoint_params.verify_interval_secs as i64)
Expand Down

0 comments on commit e33d7a0

Please sign in to comment.