Skip to content

Commit

Permalink
Adding account to api
Browse files Browse the repository at this point in the history
  • Loading branch information
scx1332 committed Feb 7, 2024
1 parent 33e82fd commit e42b19d
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 30 deletions.
15 changes: 5 additions & 10 deletions crates/erc20_payment_lib/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,11 +865,9 @@ pub async fn mint_golem_token(
}

let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = format!(
"from_addr=\"{:#x}\" AND method=\"FAUCET.create\" AND fee_paid is NULL",
from
);
let tx_existing = get_transactions(&mut *db_transaction, Some(&filter), None, None)
let filter = "method=\"FAUCET.create\" AND fee_paid is NULL";

let tx_existing = get_transactions(&mut *db_transaction, Some(from), Some(filter), None, None)
.await
.map_err(err_from!())?;

Expand Down Expand Up @@ -1254,11 +1252,8 @@ pub async fn deposit_funds(
}

let mut db_transaction = conn.begin().await.map_err(err_from!())?;
let filter = format!(
"from_addr=\"{:#x}\" AND method=\"LOCK.deposit\" AND fee_paid is NULL",
from
);
let tx_existing = get_transactions(&mut *db_transaction, Some(&filter), None, None)
let filter = "method=\"LOCK.deposit\" AND fee_paid is NULL";
let tx_existing = get_transactions(&mut *db_transaction, Some(from), Some(filter), None, None)
.await
.map_err(err_from!())?;

Expand Down
4 changes: 3 additions & 1 deletion crates/erc20_payment_lib/src/sender/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use sqlx::SqlitePool;
use tokio::sync::mpsc;

use crate::runtime::send_driver_event;
use crate::signer::SignerAccount;
use erc20_payment_lib_common::model::TokenTransferDbObj;
use erc20_payment_lib_common::{DriverEvent, DriverEventContent, TransactionFailedReason};
use web3::types::{Address, U256};
Expand Down Expand Up @@ -50,12 +51,13 @@ pub struct TokenTransferMultiOrder {
}

pub async fn gather_transactions_pre(
account: &SignerAccount,
conn: &SqlitePool,
_payment_setup: &PaymentSetup,
) -> Result<TokenTransferMap, PaymentError> {
let mut transfer_map = TokenTransferMap::new();

let mut token_transfers = get_pending_token_transfers(conn)
let mut token_transfers = get_pending_token_transfers(conn, account.address)
.await
.map_err(err_from!())?;

Expand Down
34 changes: 19 additions & 15 deletions crates/erc20_payment_lib/src/sender/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ pub async fn update_tx_result(
}

pub async fn process_transactions(
signer_account: &SignerAccount,
event_sender: Option<tokio::sync::mpsc::Sender<DriverEvent>>,
shared_state: Arc<std::sync::Mutex<SharedState>>,
conn: &SqlitePool,
Expand All @@ -264,9 +265,10 @@ pub async fn process_transactions(

let mut current_wait_time_no_gas_token: f64 = 0.0;
loop {
let mut transactions = get_next_transactions_to_process(conn, 1)
.await
.map_err(err_from!())?;
let mut transactions =
get_next_transactions_to_process(conn, Some(signer_account.address), 1)
.await
.map_err(err_from!())?;

let Some(tx) = transactions.get_mut(0) else {
log::debug!("No transactions to process, breaking from loop");
Expand Down Expand Up @@ -522,6 +524,7 @@ pub async fn service_loop(
if payment_setup.generate_tx_only {
log::warn!("Skipping processing transactions...");
} else if let Err(e) = process_transactions(
&signer_account,
event_sender.clone(),
shared_state.clone(),
conn,
Expand Down Expand Up @@ -569,21 +572,22 @@ pub async fn service_loop(
metrics::counter!(metric_label_gather_pre, 1);

log::debug!("Gathering payments...");
let mut token_transfer_map = match gather_transactions_pre(conn, payment_setup).await {
Ok(token_transfer_map) => token_transfer_map,
Err(e) => {
metrics::counter!(metric_label_gather_pre_error, 1);
log::error!(
let mut token_transfer_map =
match gather_transactions_pre(&signer_account, conn, payment_setup).await {
Ok(token_transfer_map) => token_transfer_map,
Err(e) => {
metrics::counter!(metric_label_gather_pre_error, 1);
log::error!(
"Error in gather transactions, driver will be stuck, Fix DB to continue {:?}",
e
);
tokio::time::sleep(std::time::Duration::from_secs(
payment_setup.process_interval_after_error,
))
.await;
continue;
}
};
tokio::time::sleep(std::time::Duration::from_secs(
payment_setup.process_interval_after_error,
))
.await;
continue;
}
};
metrics::counter!(metric_label_gather_post, 1);

match gather_transactions_post(
Expand Down
8 changes: 7 additions & 1 deletion crates/erc20_payment_lib/src/server/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ pub async fn transactions(data: Data<Box<ServerData>>, _req: HttpRequest) -> imp
//todo: add limits
let txs = {
let db_conn = data.db_connection.lock().await;
return_on_error!(get_transactions(&*db_conn, None, None, None).await)
return_on_error!(get_transactions(&*db_conn, None, None, None, None).await)
};
web::Json(json!({
"txs": txs,
Expand Down Expand Up @@ -429,6 +429,7 @@ pub async fn transactions_next(data: Data<Box<ServerData>>, req: HttpRequest) ->
return_on_error!(
get_transactions(
&*db_conn,
None,
Some(TRANSACTION_FILTER_QUEUED),
limit,
Some(TRANSACTION_ORDER_BY_CREATE_DATE)
Expand All @@ -450,6 +451,7 @@ pub async fn transactions_current(
return_on_error!(
get_transactions(
&*db_conn,
None,
Some(TRANSACTION_FILTER_PROCESSING),
None,
Some(TRANSACTION_ORDER_BY_CREATE_DATE)
Expand Down Expand Up @@ -477,6 +479,7 @@ pub async fn transactions_last_processed(
return_on_error!(
get_transactions(
&*db_conn,
None,
Some(TRANSACTION_FILTER_DONE),
limit,
Some(TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC)
Expand Down Expand Up @@ -506,6 +509,7 @@ pub async fn transactions_feed(data: Data<Box<ServerData>>, req: HttpRequest) ->
let mut txs = return_on_error!(
get_transactions(
&mut *db_transaction,
None,
Some(TRANSACTION_FILTER_DONE),
limit_prev,
Some(TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC)
Expand All @@ -515,6 +519,7 @@ pub async fn transactions_feed(data: Data<Box<ServerData>>, req: HttpRequest) ->
let txs_current = return_on_error!(
get_transactions(
&mut *db_transaction,
None,
Some(TRANSACTION_FILTER_PROCESSING),
None,
Some(TRANSACTION_ORDER_BY_CREATE_DATE)
Expand All @@ -524,6 +529,7 @@ pub async fn transactions_feed(data: Data<Box<ServerData>>, req: HttpRequest) ->
let tx_next = return_on_error!(
get_transactions(
&mut *db_transaction,
None,
Some(TRANSACTION_FILTER_QUEUED),
limit_next,
Some(TRANSACTION_ORDER_BY_CREATE_DATE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,16 @@ pub async fn get_token_transfers_by_chain_id(

pub async fn get_pending_token_transfers(
conn: &SqlitePool,
account: Address,
) -> Result<Vec<TokenTransferDbObj>, sqlx::Error> {
let rows = sqlx::query_as::<_, TokenTransferDbObj>(
r"SELECT * FROM token_transfer
WHERE tx_id is null
AND error is null
AND from_addr = $1
",
)
.bind(format!("{:#x}", account))
.fetch_all(conn)
.await?;
Ok(rows)
Expand Down
10 changes: 9 additions & 1 deletion crates/erc20_payment_lib_common/src/db/ops/tx_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::model::TxDbObj;
use sqlx::Sqlite;
use sqlx::SqlitePool;
use sqlx::{Executor, Transaction};
use web3::types::Address;

pub const TRANSACTION_FILTER_QUEUED: &str = "processing > 0 AND first_processed IS NULL";
pub const TRANSACTION_FILTER_PROCESSING: &str = "processing > 0 AND first_processed IS NOT NULL";
Expand All @@ -14,6 +15,7 @@ pub const TRANSACTION_ORDER_BY_FIRST_PROCESSED_DATE_DESC: &str = "first_processe

pub async fn get_transactions<'c, E>(
executor: E,
account: Option<Address>,
filter: Option<&str>,
limit: Option<i64>,
order: Option<&str>,
Expand All @@ -24,8 +26,12 @@ where
let limit = limit.unwrap_or(i64::MAX);
let filter = filter.unwrap_or(TRANSACTION_FILTER_ALL);
let order = order.unwrap_or("id DESC");
let filter_account = match account {
Some(addr) => format!("from_addr = '{:#x}'", addr),
None => "1 = 1".to_string(),
};
let rows = sqlx::query_as::<_, TxDbObj>(
format!(r"SELECT * FROM tx WHERE {filter} ORDER BY {order} LIMIT {limit}").as_str(),
format!(r"SELECT * FROM tx WHERE ({filter_account}) AND ({filter}) ORDER BY {order} LIMIT {limit}").as_str(),
)
.fetch_all(executor)
.await?;
Expand Down Expand Up @@ -137,10 +143,12 @@ pub async fn get_transaction_count(

pub async fn get_next_transactions_to_process(
conn: &SqlitePool,
account: Option<Address>,
limit: i64,
) -> Result<Vec<TxDbObj>, sqlx::Error> {
get_transactions(
conn,
account,
Some(TRANSACTION_FILTER_TO_PROCESS),
Some(limit),
Some(TRANSACTION_ORDER_BY_ID_AND_REPLACEMENT_ID),
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ async fn main_internal() -> Result<(), PaymentError> {
}
}
if cleanup_options.remove_tx_stuck {
let mut transactions = get_next_transactions_to_process(&conn, 1)
let mut transactions = get_next_transactions_to_process(&conn, None, 1)
.await
.map_err(err_from!())?;

Expand Down Expand Up @@ -918,7 +918,7 @@ async fn main_internal() -> Result<(), PaymentError> {
}
}
if cleanup_options.remove_tx_unsafe {
let mut transactions = get_next_transactions_to_process(&conn, 1)
let mut transactions = get_next_transactions_to_process(&conn, None, 1)
.await
.map_err(err_from!())?;

Expand Down

0 comments on commit e42b19d

Please sign in to comment.