Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Feb 12, 2024
1 parent 07bd3eb commit 1d95607
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
77 changes: 48 additions & 29 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,12 +367,13 @@ pub enum TransferType {
}

pub struct PaymentRuntime {
pub runtime_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
//pub runtime_handles: Arc<std::sync::Mutex<Vec<JoinHandle<()>>>>,
pub setup: PaymentSetup,
pub shared_state: Arc<std::sync::Mutex<SharedState>>,
pub wake: Arc<Notify>,
pub driver_broadcast_sender: Option<broadcast::Sender<DriverEvent>>,
pub driver_mpsc_sender: Option<mpsc::Sender<DriverEvent>>,
pub raw_event_sender: mpsc::Sender<DriverEvent>,
conn: SqlitePool,
status_tracker: StatusTracker,
config: Config,
Expand Down Expand Up @@ -405,16 +406,13 @@ pub struct TransferArgs {
impl PaymentRuntime {
fn start_service_loop(
&self,
addr: SignerAccount,
signer_address: Address,
notify: Arc<Notify>,
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
) -> JoinHandle<()> {
let shared_state_clone = self.shared_state.clone();
let raw_event_sender = self
.driver_mpsc_sender
.clone()
.expect("No mpsc sender in runtime");
let raw_event_sender = self.raw_event_sender.clone();
let config = self.config.clone();
let ps = self.setup.clone();
let conn = self.conn.clone();
Expand Down Expand Up @@ -463,7 +461,7 @@ impl PaymentRuntime {
} else {
service_loop(
shared_state_clone,
addr.address,
signer_address,
notify,
&conn,
&ps,
Expand Down Expand Up @@ -525,7 +523,7 @@ impl PaymentRuntime {
.collect::<Vec<SignerAccount>>();

let shared_state = Arc::new(std::sync::Mutex::new(SharedState {
accounts: accounts.clone(),
accounts: vec![],
inserted: 0,
idling: false,
current_tx_info: BTreeMap::new(),
Expand All @@ -536,29 +534,26 @@ impl PaymentRuntime {
let notify = Arc::new(Notify::new());

let pr = PaymentRuntime {
runtime_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
//runtime_handles: Arc::new(std::sync::Mutex::new(Vec::new())),
setup: payment_setup,
shared_state,
wake: notify.clone(),
conn,
status_tracker,
driver_broadcast_sender,
driver_mpsc_sender,
raw_event_sender,
config: payment_runtime_args.config,
};

let mut tasks = Vec::new();
for addr in accounts {
let jh = pr.start_service_loop(
addr,
notify.clone(),
for signer_account in accounts {
pr.add_account(
signer_account,
payment_runtime_args.extra_testing.clone(),
options.clone(),
);
tasks.push(jh);
}

*pr.runtime_handles.lock().unwrap() = tasks;
/* - use this to test notifies
let notify_ = notify.clone();
tokio::spawn(async move {
Expand All @@ -573,21 +568,31 @@ impl PaymentRuntime {
}

fn get_and_remove_tasks(&self) -> Vec<JoinHandle<()>> {
let handles = {
let mut mutex_guard = self.runtime_handles.lock().unwrap();
let g: &mut Vec<JoinHandle<()>> = &mut mutex_guard;
std::mem::take(g)
};
handles
self.shared_state
.lock()
.unwrap()
.accounts
.iter_mut()
.filter_map(|a| a.jh.lock().unwrap().take())
.collect()
}

pub fn is_any_task_running(&self) -> bool {
let mutex_guard = self.runtime_handles.lock().unwrap();
mutex_guard.iter().any(|h| !h.is_finished())
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| !jh.is_finished())
})
}

pub fn is_any_task_finished(&self) -> bool {
let mutex_guard = self.runtime_handles.lock().unwrap();
mutex_guard.iter().any(|h| h.is_finished())
self.shared_state.lock().unwrap().accounts.iter().any(|a| {
a.jh.lock()
.unwrap()
.as_ref()
.is_some_and(|jh| jh.is_finished())
})
}

pub async fn join_tasks(&self) -> Result<(), JoinError> {
Expand All @@ -611,7 +616,12 @@ impl PaymentRuntime {
}
}

pub async fn add_account(&self, payment_account: SignerAccount) {
pub fn add_account(
&self,
payment_account: SignerAccount,
extra_testing: Option<ExtraOptionsForTesting>,
options: AdditionalOptions,
) -> bool {
log::info!("Adding account: {}", payment_account);
let mut sh = self.shared_state.lock().unwrap();

Expand All @@ -620,10 +630,19 @@ impl PaymentRuntime {
.iter()
.any(|a| a.address == payment_account.address)
{
log::warn!("Account already added: {}", payment_account);
return;
log::error!("Account already added: {}", payment_account);
return false;
}
let jh = self.start_service_loop(
payment_account.address,
self.wake.clone(),
extra_testing,
options,
);
*payment_account.jh.lock().unwrap() = Some(jh);
sh.accounts.push(payment_account);

true
}

pub async fn get_unpaid_token_amount(
Expand Down
4 changes: 4 additions & 0 deletions crates/erc20_payment_lib/src/signer/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use erc20_payment_lib_common::error::PaymentError;
use serde::Serialize;
use std::fmt::{Debug, Display, Formatter};
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::timeout;

use super::Signer;
Expand All @@ -15,6 +16,8 @@ pub struct SignerAccount {
#[serde(skip)]
pub signer: Arc<Box<dyn Signer + Send + Sync>>,
pub(crate) external_gather_time: Arc<Mutex<Option<DateTime<Utc>>>>,
#[serde(skip)]
pub(crate) jh: Arc<Mutex<Option<JoinHandle<()>>>>,
}

impl Debug for SignerAccount {
Expand All @@ -35,6 +38,7 @@ impl SignerAccount {
address,
signer,
external_gather_time: Arc::new(Mutex::new(None)),
jh: Arc::new(Mutex::new(None)),
}
}

Expand Down

0 comments on commit 1d95607

Please sign in to comment.