Skip to content

Commit

Permalink
Rework of the verifier. Now tasks are no more spawned like crazy
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Feb 9, 2024
1 parent 9e77f67 commit ad4ccf1
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 80 deletions.
1 change: 1 addition & 0 deletions crates/erc20_payment_lib/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl PaymentSetup {
json_sources,
dns_sources,
mpsc_sender.as_ref().map(|s| s.downgrade()),
Duration::from_secs(10),
Duration::from_secs(chain_config.1.external_source_check_interval.unwrap_or(300)),
);

Expand Down
76 changes: 19 additions & 57 deletions crates/erc20_rpc_pool/src/rpc_pool/pool.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
mod resolver;
mod verifier;

use crate::rpc_pool::pool::resolver::ExternalSourceResolver;
use crate::rpc_pool::verify::{verify_endpoint, ReqStats, Web3EndpointParams, Web3RpcSingleParams};
use crate::rpc_pool::pool::verifier::EndpointsVerifier;
use crate::rpc_pool::verify::{ReqStats, Web3EndpointParams, Web3RpcSingleParams};
use crate::rpc_pool::VerifyEndpointResult;
use crate::Web3RpcInfo;
use chrono::Utc;
use erc20_payment_lib_common::DriverEvent;
use futures::future;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
Expand Down Expand Up @@ -105,12 +106,11 @@ pub struct Web3RpcPool {
pub external_json_sources: Vec<Web3ExternalJsonSource>,
pub external_dns_sources: Vec<Web3ExternalDnsSource>,

pub last_external_check: Arc<Mutex<Option<std::time::Instant>>>,
pub check_external_sources_interval: Duration,

pub last_verify_endpoints_spawn: Arc<Mutex<Option<std::time::Instant>>>,
pub verify_rpc_min_interval: Duration,

pub external_sources_resolver: Arc<ExternalSourceResolver>,
pub endpoint_verifier: Arc<EndpointsVerifier>,
}

pub async fn resolve_txt_record_to_string_array(record: &str) -> std::io::Result<Vec<String>> {
Expand All @@ -134,6 +134,7 @@ impl Web3RpcPool {
json_sources: Vec<Web3ExternalJsonSource>,
dns_sources: Vec<Web3ExternalDnsSource>,
events: Option<tokio::sync::mpsc::WeakSender<DriverEvent>>,
verify_rpc_min_interval: Duration,
external_sources_interval_check: Duration,
) -> Arc<Self> {
let mut web3_endpoints = Arena::new();
Expand Down Expand Up @@ -182,10 +183,10 @@ impl Web3RpcPool {
event_sender: events,
external_json_sources: json_sources,
external_dns_sources: dns_sources,
last_external_check: Arc::new(Mutex::new(None)),
verify_rpc_min_interval,
check_external_sources_interval: external_sources_interval_check,
last_verify_endpoints_spawn: Arc::new(Mutex::new(None)),
external_sources_resolver: Arc::new(ExternalSourceResolver::new()),
endpoint_verifier: Arc::new(Default::default()),
});

if !s.external_json_sources.is_empty() || !s.external_dns_sources.is_empty() {
Expand Down Expand Up @@ -221,6 +222,7 @@ impl Web3RpcPool {
Vec::new(),
Vec::new(),
None,
Duration::from_secs(10),
Duration::from_secs(300),
)
}
Expand Down Expand Up @@ -257,34 +259,6 @@ impl Web3RpcPool {
self.chain_id
}

pub async fn verify_unverified_endpoints(self: Arc<Self>) {
let _guard = self.verify_mutex.lock().await;
let futures = {
let endpoints_copy = self
.endpoints
.try_lock_for(Duration::from_secs(5))
.unwrap()
.clone();

let mut futures = Vec::new();
for (_idx, endp) in endpoints_copy {
{
if endp
.try_read_for(Duration::from_secs(5))
.unwrap()
.is_removed()
{
continue;
}
}
futures.push(verify_endpoint(self.chain_id, endp.clone()));
}
futures
};

future::join_all(futures).await;
}

pub fn extra_score_from_last_chosen(&self) -> (Option<Index>, i64) {
let mut extra_score_idx = None;
let mut extra_score = 0;
Expand Down Expand Up @@ -391,35 +365,23 @@ impl Web3RpcPool {

let self_cloned = self.clone();

let spawn_endpoints = if let Some(last_verify_endpoints_spawn) = self_cloned
.last_verify_endpoints_spawn
.try_lock_for(Duration::from_secs(5))
.unwrap()
.as_ref()
{
last_verify_endpoints_spawn.elapsed() >= Duration::from_secs(10)
} else {
true
};

if spawn_endpoints {
self_cloned
.last_verify_endpoints_spawn
.try_lock_for(Duration::from_secs(5))
.unwrap()
.replace(std::time::Instant::now());
log::error!("Spawn tasks");
tokio::spawn(self_cloned.verify_unverified_endpoints());
}
self_cloned
.endpoint_verifier
.clone()
.start_verify_if_needed(self.clone());

allowed_endpoints
} else {
let self_cloned = self.clone();
log::error!("Spawn tasks");
let verify_task = tokio::spawn(self_cloned.verify_unverified_endpoints());
self_cloned
.endpoint_verifier
.clone()
.start_verify_if_needed(self.clone());
//let verify_task = tokio::spawn(self_cloned.endpoint_verifier.verify_unverified_endpoints(self));

loop {
let is_finished = verify_task.is_finished();
let is_finished = self_cloned.endpoint_verifier.is_finished();

if let Some(el) = endpoints_copy
.iter()
Expand Down
28 changes: 7 additions & 21 deletions crates/erc20_rpc_pool/src/rpc_pool/pool/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,42 +41,28 @@ impl ExternalSourceResolver {
}

pub(super) fn start_resolve_if_needed(self: Arc<Self>, pool: Arc<Web3RpcPool>) {
let mut last_external_check = pool
.last_external_check
let mut last_check = self
.last_check
.try_lock_for(Duration::from_secs(5))
.unwrap();
if let Some(last_external_check) = last_external_check.as_ref() {
if last_external_check.elapsed() < pool.check_external_sources_interval {
if let Some(last_check) = last_check.as_ref() {
if last_check.elapsed() < pool.check_external_sources_interval {
log::debug!(
"Last external check was less than check_external_sources_interval ago"
);
return;
}
}
last_external_check.replace(std::time::Instant::now());
last_check.replace(std::time::Instant::now());
//spawn async task and return immediately
let pool = pool.clone();
let self_clone = self.clone();
tokio::spawn(async move {
log::error!("Starting external resolver");
self.resolve_external_addresses_int(pool).await;
self_clone.resolve_external_addresses_int(pool).await;
});
}
async fn resolve_external_addresses_int(self: Arc<Self>, pool: Arc<Web3RpcPool>) {
{
let mut last_external_check = pool
.last_external_check
.try_lock_for(Duration::from_secs(5))
.unwrap();
if let Some(last_external_check) = last_external_check.as_ref() {
if last_external_check.elapsed() < pool.check_external_sources_interval {
log::debug!(
"Last external check was less than check_external_sources_interval ago"
);
return;
}
}
last_external_check.replace(std::time::Instant::now());
}
pool.cleanup_sources_after_grace_period();

let dns_jobs = &pool.external_dns_sources;
Expand Down
102 changes: 102 additions & 0 deletions crates/erc20_rpc_pool/src/rpc_pool/pool/verifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
use crate::rpc_pool::verify_endpoint;
use crate::Web3RpcPool;
use futures_util::future;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug)]
pub struct EndpointsVerifier {
pub last_verify: Arc<Mutex<Option<std::time::Instant>>>,
pub is_finished: Arc<Mutex<bool>>,
pub verify_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}

impl Default for EndpointsVerifier {
fn default() -> Self {
Self::new()
}
}
impl EndpointsVerifier {
pub fn new() -> Self {
Self {
last_verify: Arc::new(Mutex::new(None)),
is_finished: Arc::new(Mutex::new(false)),
verify_handle: Arc::new(Mutex::new(None)),
}
}
pub fn is_finished(&self) -> bool {
*self
.is_finished
.try_lock_for(Duration::from_secs(5))
.unwrap()
}
pub fn get_join_handle(&self) -> Option<tokio::task::JoinHandle<()>> {
self.verify_handle
.try_lock_for(Duration::from_secs(5))
.unwrap()
.take()
}

pub fn start_verify_if_needed(self: &Arc<Self>, pool: Arc<Web3RpcPool>) {
let mut last_verify = self
.last_verify
.try_lock_for(Duration::from_secs(5))
.unwrap();
if let Some(last_verify) = last_verify.as_ref() {
if last_verify.elapsed() < pool.check_external_sources_interval {
log::debug!(
"Last external check was less than check_external_sources_interval ago"
);
return;
}
}
last_verify.replace(std::time::Instant::now());
//spawn async task and return immediately
let pool = pool.clone();
let self_cloned = self.clone();
let h = tokio::spawn(async move {
log::error!("Starting external resolver");
self_cloned
.clone()
.verify_unverified_endpoints(pool.clone())
.await;
*self_cloned
.is_finished
.try_lock_for(Duration::from_secs(5))
.unwrap() = true;
});
self.verify_handle
.try_lock_for(Duration::from_secs(5))
.unwrap()
.replace(h);
}

async fn verify_unverified_endpoints(self: Arc<EndpointsVerifier>, pool: Arc<Web3RpcPool>) {
let _guard = pool.verify_mutex.lock().await;
let futures = {
let endpoints_copy = pool
.endpoints
.try_lock_for(Duration::from_secs(5))
.unwrap()
.clone();

let mut futures = Vec::new();
for (_idx, endp) in endpoints_copy {
{
if endp
.try_read_for(Duration::from_secs(5))
.unwrap()
.is_removed()
{
continue;
}
}
futures.push(verify_endpoint(pool.chain_id, endp.clone()));
}
futures
};

future::join_all(futures).await;
}
}
7 changes: 5 additions & 2 deletions src/actions/check_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub async fn check_rpc_local(
Vec::new(),
Vec::new(),
None,
Duration::from_secs(10),
Duration::from_secs(300),
);
for rpc_settings in &chain_cfg.rpc_endpoints {
Expand Down Expand Up @@ -167,8 +168,10 @@ pub async fn check_rpc_local(
);
};
}

let task = tokio::spawn(web3_pool.clone().verify_unverified_endpoints());
web3_pool
.endpoint_verifier
.start_verify_if_needed(web3_pool.clone());
let task = web3_pool.endpoint_verifier.get_join_handle().unwrap();
let mut idx_set_completed = HashSet::new();

let enp_info = loop {
Expand Down

0 comments on commit ad4ccf1

Please sign in to comment.