Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
yito88 committed May 31, 2024
1 parent 4252d7b commit 8be9d5a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 61 deletions.
2 changes: 1 addition & 1 deletion crates/relayer/src/chain/cosmos/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ async fn sequential_send_messages_as_batches(
Ok(tx_sync_results)
}

fn response_to_tx_sync_result(
pub fn response_to_tx_sync_result(
chain_id: &ChainId,
message_count: usize,
response: Response,
Expand Down
32 changes: 10 additions & 22 deletions crates/relayer/src/chain/namada.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use ibc_relayer_types::core::ics24_host::path::{
AcksPath, ChannelEndsPath, ClientConsensusStatePath, ClientStatePath, CommitmentsPath,
ConnectionsPath, ReceiptsPath, SeqRecvsPath,
};
use ibc_relayer_types::events::IbcEvent;
use ibc_relayer_types::signer::Signer;
use ibc_relayer_types::Height as ICSHeight;
use namada_ibc::storage;
Expand Down Expand Up @@ -60,8 +59,8 @@ use tokio::runtime::Runtime as TokioRuntime;

use crate::account::Balance;
use crate::chain::client::ClientSettings;
use crate::chain::cosmos::batch::response_to_tx_sync_result;
use crate::chain::cosmos::config::CosmosSdkConfig;
use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult};
use crate::chain::cosmos::version::Specs;
use crate::chain::endpoint::{ChainEndpoint, ChainStatus, HealthCheck};
use crate::chain::handle::Subscription;
Expand Down Expand Up @@ -180,7 +179,7 @@ impl NamadaChain {
&DefaultLogger::new(self.ctx.io()),
None,
None,
0,
1,
&[],
&[],
)
Expand Down Expand Up @@ -336,32 +335,21 @@ impl ChainEndpoint for NamadaChain {
if proto_msgs.is_empty() {
return Ok(vec![]);
}
// Note: we don't have any height information in this case. This hack will fix itself
// once we remove the `ChainError` event (which is not actually an event)
let height = ICSHeight::new(self.config.id.version(), 1).unwrap();
let max_msg_num = if self.config().sequential_batch_tx {
1
} else {
self.config().max_msg_num.to_usize()
};
let msg_chunks = proto_msgs.chunks(max_msg_num);
let mut responses = vec![];
let mut tx_sync_results = vec![];
for msg_chunk in msg_chunks {
responses.push(self.batch_txs(msg_chunk)?);
}

let mut tx_sync_results: Vec<TxSyncResult> = responses.into_iter().map(|response| {
let events_per_tx = vec![IbcEventWithHeight::new(IbcEvent::ChainError(format!(
"check_tx (broadcast_tx_sync) on chain {} for Tx hash {} reports error: code={:?}, log={:?}",
self.config.id, response.hash, response.code, response.log
)), height)];

TxSyncResult {
let response = self.batch_txs(msg_chunk)?;
tx_sync_results.push(response_to_tx_sync_result(
&self.config().id,
msg_chunk.len(),
response,
events: events_per_tx,
status: TxStatus::Pending { message_count: 1 },
}
}).collect();
));
}

self.wait_for_block_commits(&mut tx_sync_results)?;

Expand Down Expand Up @@ -1097,7 +1085,7 @@ impl ChainEndpoint for NamadaChain {
None => Ok(vec![]),
}
}
QueryTxRequest::Transaction(tx) => self.query_tx_events(&tx.0.to_string()),
QueryTxRequest::Transaction(tx) => self.query_tx_events(&tx.0),
}
}

Expand Down
22 changes: 13 additions & 9 deletions crates/relayer/src/chain/namada/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use namada_ibc::storage::{ibc_trace_key_prefix, is_ibc_trace_key};
use namada_sdk::address::{Address, InternalAddress};
use namada_sdk::borsh::BorshDeserialize;
use namada_sdk::events::extend::Height as HeightAttr;
use namada_sdk::events::Event as NamadaEvent;
use namada_sdk::queries::{Client as SdkClient, RPC};
use namada_sdk::rpc;
use namada_sdk::storage::{BlockHeight, Epoch, Key, PrefixValue};
use namada_sdk::tx::data::ResultCode;
use namada_sdk::tx::event::Code as CodeAttr;
use namada_sdk::Namada;
use tendermint::block::Height as TmHeight;
use tendermint::Hash as TmHash;

use crate::chain::endpoint::ChainEndpoint;
use crate::chain::requests::{
Expand Down Expand Up @@ -111,15 +113,8 @@ impl NamadaChain {
}

/// Get all IBC events when the tx has been applied
pub fn query_tx_events(&self, tx_hash: &str) -> Result<Vec<IbcEventWithHeight>, Error> {
match self
.rt
.block_on(RPC.shell().applied(
self.ctx.client(),
&tx_hash.try_into().expect("Invalid tx hash"),
))
.map_err(NamadaError::query)?
{
pub fn query_tx_events(&self, tx_hash: &TmHash) -> Result<Vec<IbcEventWithHeight>, Error> {
match self.query_applied_event(tx_hash)? {
Some(applied) => {
let h = applied
.read_attribute::<HeightAttr>()
Expand All @@ -145,6 +140,15 @@ impl NamadaChain {
}
}

fn query_applied_event(&self, tx_hash: &TmHash) -> Result<Option<NamadaEvent>, Error> {
self.rt
.block_on(RPC.shell().applied(
self.ctx.client(),
&tx_hash.as_ref().try_into().expect("Invalid tx hash"),
))
.map_err(|e| Error::namada(NamadaError::query(e)))
}

/// Get IBC packet events
pub fn query_packet_events_from_block(
&self,
Expand Down
88 changes: 59 additions & 29 deletions crates/relayer/src/chain/namada/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::thread;
use std::time::Instant;

use ibc_proto::google::protobuf::Any;
use itertools::Itertools;
use namada_sdk::address::{Address, ImplicitAddress};
use namada_sdk::args::{self, TxBuilder};
use namada_sdk::args::{InputAmount, Tx as TxArgs, TxCustom};
Expand All @@ -25,11 +26,11 @@ use namada_sdk::{signing, tx, Namada};
use namada_trans_token::Transfer;
use tendermint_proto::Protobuf;
use tendermint_rpc::endpoint::broadcast::tx_sync::Response;
use tracing::{debug, debug_span, trace};

use crate::chain::cosmos;
use crate::chain::cosmos::types::tx::{TxStatus, TxSyncResult};
use crate::chain::cosmos::wait::all_tx_results_found;
use crate::chain::endpoint::ChainEndpoint;
use crate::chain::requests::{QueryTxHash, QueryTxRequest};
use crate::error::Error;

use super::error::Error as NamadaError;
Expand All @@ -39,6 +40,10 @@ const WAIT_BACKOFF: Duration = Duration::from_millis(300);

impl NamadaChain {
pub fn batch_txs(&mut self, msgs: &[Any]) -> Result<Response, Error> {
if msgs.is_empty() {
return Err(Error::send_tx("No message to be batched".to_string()));
}

let tx_args = self.make_tx_args()?;

let relayer_addr = self.get_key()?.address;
Expand Down Expand Up @@ -283,42 +288,67 @@ impl NamadaChain {
&self,
tx_sync_results: &mut [TxSyncResult],
) -> Result<(), Error> {
let start_time = Instant::now();
loop {
if cosmos::wait::all_tx_results_found(tx_sync_results) {
return Ok(());
if all_tx_results_found(tx_sync_results) {
return Ok(());
}

let chain_id = &self.config().id;
crate::time!(
"wait_for_block_commits",
{
"src_chain": chain_id,
}
);
let _span = debug_span!("wait_for_block_commits", id = %chain_id).entered();

let start_time = Instant::now();

let hashes = tx_sync_results
.iter()
.map(|res| res.response.hash.to_string())
.join(", ");

debug!("waiting for commit of tx hashes(s) {}", hashes);

loop {
let elapsed = start_time.elapsed();
if elapsed > self.config.rpc_timeout {

if all_tx_results_found(tx_sync_results) {
trace!(
"retrieved {} tx results after {} ms",
tx_sync_results.len(),
elapsed.as_millis(),
);

return Ok(());
} else if elapsed > self.config().rpc_timeout {
debug!("timed out after {} ms", elapsed.as_millis());
return Err(Error::tx_no_confirmation());
}
} else {
thread::sleep(WAIT_BACKOFF);

thread::sleep(WAIT_BACKOFF);
for tx_sync_result in tx_sync_results.iter_mut() {
self.update_tx_sync_result(tx_sync_result)?;
}
}
}
}

for TxSyncResult {
response,
events,
status,
} in tx_sync_results.iter_mut()
{
if let TxStatus::Pending { message_count: _ } = status {
// If the transaction failed, query_txs returns the IbcEvent::ChainError,
// so that we don't attempt to resolve the transaction later on.
if let Ok(events_per_tx) =
self.query_txs(QueryTxRequest::Transaction(QueryTxHash(response.hash)))
{
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events_per_tx.is_empty() {
*events = events_per_tx;
*status = TxStatus::ReceivedResponse;
}
}
fn update_tx_sync_result(&self, tx_sync_result: &mut TxSyncResult) -> Result<(), Error> {
if let TxStatus::Pending { .. } = tx_sync_result.status {
// If the transaction failed, query_txs returns the IbcEvent::ChainError,
// so that we don't attempt to resolve the transaction later on.
if let Ok(events) = self.query_tx_events(&tx_sync_result.response.hash) {
// If we get events back, progress was made, so we replace the events
// with the new ones. in both cases we will check in the next iteration
// whether or not the transaction was fully committed.
if !events.is_empty() {
tx_sync_result.events = events;
tx_sync_result.status = TxStatus::ReceivedResponse;
}
}
}
Ok(())
}

async fn submit_reveal_aux(&mut self, args: &TxArgs, address: &Address) -> Result<(), Error> {
Expand Down

0 comments on commit 8be9d5a

Please sign in to comment.