From 8be9d5a97ce7f7a385b6e57db06023c7a0eee160 Mon Sep 17 00:00:00 2001 From: yito88 Date: Sat, 25 May 2024 00:21:59 +0200 Subject: [PATCH] refactoring --- crates/relayer/src/chain/cosmos/batch.rs | 2 +- crates/relayer/src/chain/namada.rs | 32 +++------ crates/relayer/src/chain/namada/query.rs | 22 +++--- crates/relayer/src/chain/namada/tx.rs | 88 ++++++++++++++++-------- 4 files changed, 83 insertions(+), 61 deletions(-) diff --git a/crates/relayer/src/chain/cosmos/batch.rs b/crates/relayer/src/chain/cosmos/batch.rs index 3e52f52aec..bf55a4ec27 100644 --- a/crates/relayer/src/chain/cosmos/batch.rs +++ b/crates/relayer/src/chain/cosmos/batch.rs @@ -209,7 +209,7 @@ async fn sequential_send_messages_as_batches( Ok(tx_sync_results) } -fn response_to_tx_sync_result( +pub fn response_to_tx_sync_result( chain_id: &ChainId, message_count: usize, response: Response, diff --git a/crates/relayer/src/chain/namada.rs b/crates/relayer/src/chain/namada.rs index 607a3e415d..bd94fe7052 100644 --- a/crates/relayer/src/chain/namada.rs +++ b/crates/relayer/src/chain/namada.rs @@ -29,7 +29,6 @@ use ibc_relayer_types::core::ics24_host::path::{ AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath, ConnectionsPath, ReceiptsPath, SeqRecvsPath, }; -use ibc_relayer_types::events::IbcEvent; use ibc_relayer_types::signer::Signer; use ibc_relayer_types::Height as ICSHeight; use namada_ibc::storage; @@ -60,8 +59,8 @@ use tokio::runtime::Runtime as TokioRuntime; use crate::account::Balance; use crate::chain::client::ClientSettings; +use crate::chain::cosmos::batch::response_to_tx_sync_result; use crate::chain::cosmos::config::CosmosSdkConfig; -use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult}; use crate::chain::cosmos::version::Specs; use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck}; use crate::chain::handle::Subscription; @@ -180,7 +179,7 @@ impl NamadaChain { &DefaultLogger::new(self.ctx.io()), None, None, - 0, + 1, &[], &[], ) @@ -336,32 +335,21 @@ impl ChainEndpoint for NamadaChain { if proto_msgs.is_empty() { return Ok(vec![]); } - // Note: we don't have any height information in this case. This hack will fix itself - // once we remove the `ChainError` event (which is not actually an event) - let height = ICSHeight::new(self.config.id.version(), 1).unwrap(); let max_msg_num = if self.config().sequential_batch_tx { 1 } else { self.config().max_msg_num.to_usize() }; let msg_chunks = proto_msgs.chunks(max_msg_num); - let mut responses = vec![]; + let mut tx_sync_results = vec![]; for msg_chunk in msg_chunks { - responses.push(self.batch_txs(msg_chunk)?); - } - - let mut tx_sync_results: Vec = responses.into_iter().map(|response| { - let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!( - "check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}", - self.config.id, response.hash, response.code, response.log - )), height)]; - - TxSyncResult { + let response = self.batch_txs(msg_chunk)?; + tx_sync_results.push(response_to_tx_sync_result( + &self.config().id, + msg_chunk.len(), response, - events: events_per_tx, - status: TxStatus::Pending { message_count: 1 }, - } - }).collect(); + )); + } self.wait_for_block_commits(&mut tx_sync_results)?; @@ -1097,7 +1085,7 @@ impl ChainEndpoint for NamadaChain { None => Ok(vec![]), } } - QueryTxRequest::Transaction(tx) => self.query_tx_events(&tx.0.to_string()), + QueryTxRequest::Transaction(tx) => self.query_tx_events(&tx.0), } } diff --git a/crates/relayer/src/chain/namada/query.rs b/crates/relayer/src/chain/namada/query.rs index ab991652c8..222f6f3db3 100644 --- a/crates/relayer/src/chain/namada/query.rs +++ b/crates/relayer/src/chain/namada/query.rs @@ -7,6 +7,7 @@ use namada_ibc::storage::{ibc_trace_key_prefix, is_ibc_trace_key}; use namada_sdk::address::{Address, InternalAddress}; use namada_sdk::borsh::BorshDeserialize; use namada_sdk::events::extend::Height as HeightAttr; +use namada_sdk::events::Event as NamadaEvent; use namada_sdk::queries::{Client as SdkClient, RPC}; use namada_sdk::rpc; use namada_sdk::storage::{BlockHeight, Epoch, Key, PrefixValue}; @@ -14,6 +15,7 @@ use namada_sdk::tx::data::ResultCode; use namada_sdk::tx::event::Code as CodeAttr; use namada_sdk::Namada; use tendermint::block::Height as TmHeight; +use tendermint::Hash as TmHash; use crate::chain::endpoint::ChainEndpoint; use crate::chain::requests::{ @@ -111,15 +113,8 @@ impl NamadaChain { } /// Get all IBC events when the tx has been applied - pub fn query_tx_events(&self, tx_hash: &str) -> Result, Error> { - match self - .rt - .block_on(RPC.shell().applied( - self.ctx.client(), - &tx_hash.try_into().expect("Invalid tx hash"), - )) - .map_err(NamadaError::query)? - { + pub fn query_tx_events(&self, tx_hash: &TmHash) -> Result, Error> { + match self.query_applied_event(tx_hash)? { Some(applied) => { let h = applied .read_attribute::() @@ -145,6 +140,15 @@ impl NamadaChain { } } + fn query_applied_event(&self, tx_hash: &TmHash) -> Result, Error> { + self.rt + .block_on(RPC.shell().applied( + self.ctx.client(), + &tx_hash.as_ref().try_into().expect("Invalid tx hash"), + )) + .map_err(|e| Error::namada(NamadaError::query(e))) + } + /// Get IBC packet events pub fn query_packet_events_from_block( &self, diff --git a/crates/relayer/src/chain/namada/tx.rs b/crates/relayer/src/chain/namada/tx.rs index 12e966a400..3f07ed0992 100644 --- a/crates/relayer/src/chain/namada/tx.rs +++ b/crates/relayer/src/chain/namada/tx.rs @@ -5,6 +5,7 @@ use std::thread; use std::time::Instant; use ibc_proto::google::protobuf::Any; +use itertools::Itertools; use namada_sdk::address::{Address, ImplicitAddress}; use namada_sdk::args::{self, TxBuilder}; use namada_sdk::args::{InputAmount, Tx as TxArgs, TxCustom}; @@ -25,11 +26,11 @@ use namada_sdk::{signing, tx, Namada}; use namada_trans_token::Transfer; use tendermint_proto::Protobuf; use tendermint_rpc::endpoint::broadcast::tx_sync::Response; +use tracing::{debug, debug_span, trace}; -use crate::chain::cosmos; use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult}; +use crate::chain::cosmos::wait::all_tx_results_found; use crate::chain::endpoint::ChainEndpoint; -use crate::chain::requests::{QueryTxHash, QueryTxRequest}; use crate::error::Error; use super::error::Error as NamadaError; @@ -39,6 +40,10 @@ const WAIT_BACKOFF: Duration = Duration::from_millis(300); impl NamadaChain { pub fn batch_txs(&mut self, msgs: &[Any]) -> Result { + if msgs.is_empty() { + return Err(Error::send_tx("No message to be batched".to_string())); + } + let tx_args = self.make_tx_args()?; let relayer_addr = self.get_key()?.address; @@ -283,42 +288,67 @@ impl NamadaChain { &self, tx_sync_results: &mut [TxSyncResult], ) -> Result<(), Error> { - let start_time = Instant::now(); - loop { - if cosmos::wait::all_tx_results_found(tx_sync_results) { - return Ok(()); + if all_tx_results_found(tx_sync_results) { + return Ok(()); + } + + let chain_id = &self.config().id; + crate::time!( + "wait_for_block_commits", + { + "src_chain": chain_id, } + ); + let _span = debug_span!("wait_for_block_commits", id = %chain_id).entered(); + + let start_time = Instant::now(); + + let hashes = tx_sync_results + .iter() + .map(|res| res.response.hash.to_string()) + .join(", "); + + debug!("waiting for commit of tx hashes(s) {}", hashes); + loop { let elapsed = start_time.elapsed(); - if elapsed > self.config.rpc_timeout { + + if all_tx_results_found(tx_sync_results) { + trace!( + "retrieved {} tx results after {} ms", + tx_sync_results.len(), + elapsed.as_millis(), + ); + + return Ok(()); + } else if elapsed > self.config().rpc_timeout { + debug!("timed out after {} ms", elapsed.as_millis()); return Err(Error::tx_no_confirmation()); - } + } else { + thread::sleep(WAIT_BACKOFF); - thread::sleep(WAIT_BACKOFF); + for tx_sync_result in tx_sync_results.iter_mut() { + self.update_tx_sync_result(tx_sync_result)?; + } + } + } + } - for TxSyncResult { - response, - events, - status, - } in tx_sync_results.iter_mut() - { - if let TxStatus::Pending { message_count: _ } = status { - // If the transaction failed, query_txs returns the IbcEvent::ChainError, - // so that we don't attempt to resolve the transaction later on. - if let Ok(events_per_tx) = - self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash))) - { - // If we get events back, progress was made, so we replace the events - // with the new ones. in both cases we will check in the next iteration - // whether or not the transaction was fully committed. - if !events_per_tx.is_empty() { - *events = events_per_tx; - *status = TxStatus::ReceivedResponse; - } - } + fn update_tx_sync_result(&self, tx_sync_result: &mut TxSyncResult) -> Result<(), Error> { + if let TxStatus::Pending { .. } = tx_sync_result.status { + // If the transaction failed, query_txs returns the IbcEvent::ChainError, + // so that we don't attempt to resolve the transaction later on. + if let Ok(events) = self.query_tx_events(&tx_sync_result.response.hash) { + // If we get events back, progress was made, so we replace the events + // with the new ones. in both cases we will check in the next iteration + // whether or not the transaction was fully committed. + if !events.is_empty() { + tx_sync_result.events = events; + tx_sync_result.status = TxStatus::ReceivedResponse; } } } + Ok(()) } async fn submit_reveal_aux(&mut self, args: &TxArgs, address: &Address) -> Result<(), Error> {