diff --git a/protocols/v2/roles-logic-sv2/src/errors.rs b/protocols/v2/roles-logic-sv2/src/errors.rs index 1d8625124..20c4bcd55 100644 --- a/protocols/v2/roles-logic-sv2/src/errors.rs +++ b/protocols/v2/roles-logic-sv2/src/errors.rs @@ -46,6 +46,7 @@ pub enum Error { VersionTooBig, TxVersionTooBig, TxVersionTooLow, + TxDecodingError(String), NotFoundChannelId, NoValidJob, NoValidTranslatorJob, @@ -59,6 +60,7 @@ pub enum Error { TargetError(InputError), HashrateError(InputError), LogicErrorMessage(std::boxed::Box>), + JDSMissingTransactions, } impl From for Error { @@ -138,6 +140,7 @@ impl Display for Error { VersionTooBig => write!(f, "We are trying to construct a block header with version bigger than i32::MAX"), TxVersionTooBig => write!(f, "Tx version can not be greater than i32::MAX"), TxVersionTooLow => write!(f, "Tx version can not be lower than 1"), + TxDecodingError(e) => write!(f, "Impossible to decode tx: {:?}", e), NotFoundChannelId => write!(f, "No downstream has been registred for this channel id"), NoValidJob => write!(f, "Impossible to create a standard job for channelA cause no valid job has been received from upstream yet"), NoValidTranslatorJob => write!(f, "Impossible to create a extended job for channel cause no valid job has been received from upstream yet"), @@ -149,6 +152,7 @@ impl Display for Error { TargetError(e) => write!(f, "Impossible to get Target: {:?}", e), HashrateError(e) => write!(f, "Impossible to get Hashrate: {:?}", e), LogicErrorMessage(e) => write!(f, "Message is well formatted but can not be handled: {:?}", e), + JDSMissingTransactions => write!(f, "JD server cannot propagate the block: missing transactions"), } } } diff --git a/roles/jd-server/src/lib/error.rs b/roles/jd-server/src/lib/error.rs index e94a2395c..7b58c7593 100644 --- a/roles/jd-server/src/lib/error.rs +++ b/roles/jd-server/src/lib/error.rs @@ -22,6 +22,8 @@ pub enum JdsError { Custom(String), Sv2ProtocolError((u32, Mining<'static>)), MempoolError(JdsMempoolError), + ImpossibleToReconstructBlock(String), + NoLastDeclaredJob, } impl std::fmt::Display for JdsError { @@ -42,6 +44,10 @@ impl std::fmt::Display for JdsError { write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e) } MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e), + ImpossibleToReconstructBlock(e) => { + write!(f, "Error in reconstructing the block: {:?}", e) + } + NoLastDeclaredJob => write!(f, "Last declared job not found"), } } } diff --git a/roles/jd-server/src/lib/job_declarator/message_handler.rs b/roles/jd-server/src/lib/job_declarator/message_handler.rs index 38b68272d..58a40fcba 100644 --- a/roles/jd-server/src/lib/job_declarator/message_handler.rs +++ b/roles/jd-server/src/lib/job_declarator/message_handler.rs @@ -9,12 +9,12 @@ use roles_logic_sv2::{ parsers::JobDeclaration, }; use std::{convert::TryInto, io::Cursor}; -use stratum_common::bitcoin::Transaction; +use stratum_common::bitcoin::{Transaction, Txid}; pub type SendTo = SendTo_, ()>; +use super::{signed_token, TransactionState}; use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages}; use stratum_common::bitcoin::consensus::Decodable; - -use super::signed_token; +use tracing::info; use super::JobDeclaratorDownstream; @@ -53,7 +53,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { coinbase_output: self.coinbase_output.clone().try_into().unwrap(), }; let message_enum = JobDeclaration::AllocateMiningJobTokenSuccess(message_success); - println!( + info!( "Sending AllocateMiningJobTokenSuccess to proxy {:?}", message_enum ); @@ -61,6 +61,11 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { } fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result { + // the transactions that are present in the mempool are stored here, that is sent to the + // mempool which use the rpc client to retrieve the whole data for each transaction. + // The unknown transactions is a vector that contains the transactions that are not in the + // jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message + let mut known_transactions: Vec = vec![]; self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static()); if self.verify_job(&message) { let short_hash_list: Vec = message @@ -76,22 +81,34 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { .safe_lock(|x| x.to_short_ids(nonce)) .unwrap() .unwrap(); - let mut txs_in_job = vec![]; - let mut missing_txs = vec![]; + let mut transactions_with_state = + vec![TransactionState::Missing; short_hash_list.len()]; + let mut missing_txs: Vec = Vec::new(); for (i, sid) in short_hash_list.iter().enumerate() { let sid_: [u8; 6] = sid.to_vec().try_into().unwrap(); - if let Some(tx_data) = short_id_mempool.get(&sid_) { - txs_in_job.push(tx_data.clone()); - } else { - missing_txs.push(i as u16); + match short_id_mempool.get(&sid_) { + Some(tx_data) => { + transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id); + known_transactions.push(tx_data.id); + } + None => { + transactions_with_state[i] = TransactionState::Missing; + missing_txs.push(i as u16); + } } } - self.declared_mining_job = Some(( - message.clone().into_static(), - txs_in_job, + self.declared_mining_job = ( + Some(message.clone().into_static()), + transactions_with_state, missing_txs.clone(), - )); + ); + // here we send the transactions that we want to be stored in jds mempool with full data + + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .known_transactions + .append(&mut known_transactions); if missing_txs.is_empty() { let message_success = DeclareMiningJobSuccess { @@ -113,7 +130,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { JobDeclaration::ProvideMissingTransactions( message_provide_missing_transactions, ); - Ok(SendTo_::Respond(message_enum_provide_missing_transactions)) + Ok(SendTo::Respond(message_enum_provide_missing_transactions)) } } else { let message_error = DeclareMiningJobError { @@ -137,43 +154,46 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream { &mut self, message: ProvideMissingTransactionsSuccess, ) -> Result { - match &mut self.declared_mining_job { - Some((_, ref mut transactions, missing_indexes)) => { - for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { - let mut cursor = Cursor::new(tx); - let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor) - .expect("Invalid tx data from downstream"); - let index = - *missing_indexes - .get(i) - .ok_or(Error::LogicErrorMessage(Box::new( - AllMessages::JobDeclaration( - JobDeclaration::ProvideMissingTransactionsSuccess( - message.clone().into_static(), - ), - ), - )))? as usize; - transactions.insert(index, tx); - } - // TODO check it - let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static(); - let message_success = DeclareMiningJobSuccess { - request_id: message.request_id, - new_mining_job_token: signed_token( - tx_hash_list_hash, - &self.public_key.clone(), - &self.private_key.clone(), - ), - }; - let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success); - Ok(SendTo::Respond(message_enum_success)) + let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job; + let mut unknown_transactions: Vec = vec![]; + for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() { + let mut cursor = Cursor::new(tx); + let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor) + .map_err(|e| Error::TxDecodingError(e.to_string()))?; + Vec::push(&mut unknown_transactions, transaction.clone()); + let index = *missing_indexes + .get(i) + .ok_or(Error::LogicErrorMessage(Box::new( + AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess( + message.clone().into_static(), + )), + )))? as usize; + // insert the missing transactions in the mempool + transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid()); + } + self.add_txs_to_mempool + .add_txs_to_mempool_inner + .unknown_transactions + .append(&mut unknown_transactions); + // if there still a missing transaction return an error + for tx_with_state in transactions_with_state { + match tx_with_state { + TransactionState::PresentInMempool(_) => continue, + TransactionState::Missing => return Err(Error::JDSMissingTransactions), } - None => Err(Error::LogicErrorMessage(Box::new( - AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess( - message.clone().into_static(), - )), - ))), } + // TODO check it + let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static(); + let message_success = DeclareMiningJobSuccess { + request_id: message.request_id, + new_mining_job_token: signed_token( + tx_hash_list_hash, + &self.public_key.clone(), + &self.private_key.clone(), + ), + }; + let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success); + Ok(SendTo::Respond(message_enum_success)) } fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result { diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 08a9f62d9..b47b7d6a8 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -16,14 +16,33 @@ use roles_logic_sv2::{ }; use secp256k1::{Keypair, Message as SecpMessage, Secp256k1}; use std::{collections::HashMap, convert::TryInto, sync::Arc}; -use tokio::net::TcpListener; -use tracing::{error, info}; +use tokio::{net::TcpListener, time::Duration}; +use tracing::{debug, error, info}; use stratum_common::bitcoin::{ consensus::{encode::serialize, Encodable}, - Block, Transaction, + Block, Transaction, Txid, }; +#[derive(Clone, Debug)] +pub enum TransactionState { + PresentInMempool(Txid), + Missing, +} + +#[derive(Clone, Debug)] +pub struct AddTrasactionsToMempoolInner { + pub known_transactions: Vec, + pub unknown_transactions: Vec, +} + +// TODO implement send method that sends the inner via the sender +#[derive(Clone, Debug)] +pub struct AddTrasactionsToMempool { + pub add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + pub sender_add_txs_to_mempool: Sender, +} + #[derive(Debug)] pub struct JobDeclaratorDownstream { sender: Sender, @@ -37,8 +56,14 @@ pub struct JobDeclaratorDownstream { public_key: Secp256k1PublicKey, private_key: Secp256k1SecretKey, mempool: Arc>, - declared_mining_job: Option<(DeclareMiningJob<'static>, Vec, Vec)>, + // Vec is the vector of missing transactions + declared_mining_job: ( + Option>, + Vec, + Vec, + ), tx_hash_list_hash: Option>, + add_txs_to_mempool: AddTrasactionsToMempool, } impl JobDeclaratorDownstream { @@ -47,11 +72,16 @@ impl JobDeclaratorDownstream { sender: Sender, config: &Configuration, mempool: Arc>, + sender_add_txs_to_mempool: Sender, ) -> Self { let mut coinbase_output = vec![]; // TODO: use next variables let token_to_job_map = HashMap::with_hasher(BuildNoHashHasher::default()); let tokens = Id::new(); + let add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; super::get_coinbase_output(config).expect("Invalid coinbase output in config")[0] .consensus_encode(&mut coinbase_output) .expect("Invalid coinbase output in config"); @@ -65,23 +95,91 @@ impl JobDeclaratorDownstream { public_key: config.authority_public_key, private_key: config.authority_secret_key, mempool, - declared_mining_job: None, + declared_mining_job: (None, Vec::new(), Vec::new()), tx_hash_list_hash: None, + add_txs_to_mempool: AddTrasactionsToMempool { + add_txs_to_mempool_inner, + sender_add_txs_to_mempool, + }, } } - fn get_block_hex(self_mutex: Arc>, message: SubmitSolutionJd) -> Option { - //TODO: implement logic for success or error - let (last_declare, tx_list, _) = match self_mutex - .safe_lock(|x| x.declared_mining_job.take()) - .unwrap() - { - Some((last_declare, tx_list, _x)) => (last_declare, tx_list, _x), - None => return None, - }; + fn get_block_hex( + self_mutex: Arc>, + message: SubmitSolutionJd, + ) -> Result> { + let (last_declare_, _, _) = self_mutex + .clone() + .safe_lock(|x| x.declared_mining_job.clone()) + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let last_declare = last_declare_.ok_or(Box::new(JdsError::NoLastDeclaredJob))?; + let transactions_list = Self::collect_txs_in_job(self_mutex)?; let block: Block = - roles_logic_sv2::utils::BlockCreator::new(last_declare, tx_list, message).into(); - Some(hex::encode(serialize(&block))) + roles_logic_sv2::utils::BlockCreator::new(last_declare, transactions_list, message) + .into(); + Ok(hex::encode(serialize(&block))) + } + + fn collect_txs_in_job(self_mutex: Arc>) -> Result, Box> { + let (_, transactions_with_state, _) = self_mutex + .clone() + .safe_lock(|x| x.declared_mining_job.clone()) + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let mempool = self_mutex + .safe_lock(|x| x.mempool.clone()) + .map_err(|e| Box::new(JdsError::PoisonLock(e.to_string())))?; + let mut transactions_list: Vec = Vec::new(); + for tx_with_state in transactions_with_state.iter().enumerate() { + if let TransactionState::PresentInMempool(txid) = tx_with_state.1 { + let tx = mempool + .safe_lock(|x| x.mempool.get(txid).cloned()) + .map_err(|e| JdsError::PoisonLock(e.to_string()))? + .ok_or(Box::new(JdsError::ImpossibleToReconstructBlock( + "Txid not found in jds mempool".to_string(), + )))? + .ok_or(Box::new(JdsError::ImpossibleToReconstructBlock( + "Txid found in jds mempool but transactions not present".to_string(), + )))?; + transactions_list.push(tx); + } else { + return Err(Box::new(JdsError::ImpossibleToReconstructBlock( + "Unknown transaction".to_string(), + ))); + }; + } + Ok(transactions_list) + } + + async fn send_txs_to_mempool(self_mutex: Arc>) { + let add_txs_to_mempool = self_mutex + .safe_lock(|a| a.add_txs_to_mempool.clone()) + .unwrap(); + let sender_add_txs_to_mempool = add_txs_to_mempool.sender_add_txs_to_mempool; + let add_txs_to_mempool_inner = add_txs_to_mempool.add_txs_to_mempool_inner; + let _ = sender_add_txs_to_mempool + .send(add_txs_to_mempool_inner) + .await; + // the trasnactions sent to the mempool can be freed + let _ = self_mutex.safe_lock(|a| { + a.add_txs_to_mempool.add_txs_to_mempool_inner = AddTrasactionsToMempoolInner { + known_transactions: vec![], + unknown_transactions: vec![], + }; + }); + } + + fn get_transactions_in_job(self_mutex: Arc>) -> Vec { + let mut known_transactions: Vec = Vec::new(); + let job_transactions = self_mutex + .safe_lock(|a| a.declared_mining_job.1.clone()) + .unwrap(); + for transaction in job_transactions { + match transaction { + TransactionState::PresentInMempool(txid) => known_transactions.push(txid), + TransactionState::Missing => continue, + }; + } + known_transactions } pub async fn send( @@ -116,9 +214,54 @@ impl JobDeclaratorDownstream { message_type, payload, ); + // How works the txs recognition and txs storing in JDS mempool + // when a DMJ arrives, the JDS compares the received transactions with the + // ids in the the JDS mempool. Then there are two scenarios + // 1. the JDS recognizes all the transactions. Then, just before a DMJS is + // sent, the JDS mempool is triggered to fill in the JDS mempool the id + // of declared job with the full transaction (with send_tx_to_mempool + // method(), that eventually will ask the transactions to a bitcoin node + // via RPC) + // 2. there are some unknown txids. Just before sending PMT, the JDS + // mempool is triggered to fill the known txids with the full + // transactions. When a PMTS arrives, just before sending a DMJS, the + // unknown full transactions provided by the downstream are added to the + // JDS mempool match next_message_to_send { - Ok(SendTo::Respond(message)) => { - Self::send(self_mutex.clone(), message).await.unwrap(); + Ok(SendTo::Respond(m)) => { + match m { + JobDeclaration::AllocateMiningJobToken(_) => { + error!("Send unexpected message: AMJT") + } + JobDeclaration::AllocateMiningJobTokenSuccess(_) => { + debug!("Send message: AMJTS") + } + JobDeclaration::DeclareMiningJob(_) => { + error!("Send unexpected message: DMJ"); + } + JobDeclaration::DeclareMiningJobError(_) => { + debug!("Send nmessage: DMJE") + } + JobDeclaration::DeclareMiningJobSuccess(_) => { + debug!("Send message: DMJS. Updating the JDS mempool."); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::IdentifyTransactions(_) => { + debug!("Send message: IT") + } + JobDeclaration::IdentifyTransactionsSuccess(_) => { + error!("Send unexpected message: ITS") + } + JobDeclaration::ProvideMissingTransactions(_) => { + debug!("Send message: PMT. Updating the JDS mempool."); + Self::send_txs_to_mempool(self_mutex.clone()).await; + } + JobDeclaration::ProvideMissingTransactionsSuccess(_) => { + error!("Send unexpected PMTS"); + } + JobDeclaration::SubmitSolution(_) => todo!(), + } + Self::send(self_mutex.clone(), m).await.unwrap(); } Ok(SendTo::RelayNewMessage(message)) => { error!("JD Server: unexpected relay new message {:?}", message); @@ -132,51 +275,100 @@ impl JobDeclaratorDownstream { Ok(SendTo::Multiple(multiple)) => { error!("JD Server: unexpected multiple messages: {:?}", multiple); } - Ok(SendTo::None(m)) => match m { - Some(JobDeclaration::SubmitSolution(message)) => { - let hexdata = match JobDeclaratorDownstream::get_block_hex( - self_mutex.clone(), - message, - ) { - Some(inner) => inner, - None => { - error!("Received solution but no job available"); - recv.close(); - break; - } - }; - - let _ = new_block_sender.send(hexdata).await; - } - Some(JobDeclaration::DeclareMiningJob(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::DeclareMiningJobError(_)) => { - error!("JD Server received an unexpected message {:?}", m); + Ok(SendTo::None(m)) => { + match m { + Some(JobDeclaration::SubmitSolution(message)) => { + match Self::collect_txs_in_job(self_mutex.clone()) { + Ok(_) => { + info!("All transactions in downstream job are recognized correctly by the JD Server"); + let hexdata = + match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message, + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Received solution but encountered error: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + Err(error) => { + error!("Missing transactions: {:?}", error); + // TODO print here the ip of the downstream + let known_transactions = + JobDeclaratorDownstream::get_transactions_in_job( + self_mutex.clone(), + ); + let retrieve_transactions = + AddTrasactionsToMempoolInner { + known_transactions, + unknown_transactions: Vec::new(), + }; + let mempool = self_mutex + .clone() + .safe_lock(|a| a.mempool.clone()) + .unwrap(); + tokio::select! { + _ = JDsMempool::add_tx_data_to_mempool(mempool, retrieve_transactions) => { + let hexdata = match JobDeclaratorDownstream::get_block_hex( + self_mutex.clone(), + message.clone(), + ) { + Ok(inner) => inner, + Err(e) => { + error!( + "Error retrieving transactions: {:?}", + e + ); + recv.close(); + //TODO should we brake it? + break; + } + }; + let _ = new_block_sender.send(hexdata).await; + } + _ = tokio::time::sleep(Duration::from_secs(60)) => {} + }; + } + }; + } + Some(JobDeclaration::DeclareMiningJob(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::DeclareMiningJobError(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobToken(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactions(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { + error!("JD Server received an unexpected message {:?}", m); + } + None => (), } - Some(JobDeclaration::IdentifyTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::IdentifyTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobToken(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::AllocateMiningJobTokenSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactions(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - Some(JobDeclaration::ProvideMissingTransactionsSuccess(_)) => { - error!("JD Server received an unexpected message {:?}", m); - } - None => (), - }, + } Err(e) => { error!("{:?}", e); handle_result!( @@ -230,10 +422,19 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, + sender_add_txs_to_mempool: Sender, ) { let self_ = Arc::new(Mutex::new(Self {})); info!("JD INITIALIZED"); - Self::accept_incoming_connection(self_, config, status_tx, mempool, new_block_sender).await; + Self::accept_incoming_connection( + self_, + config, + status_tx, + mempool, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await; } async fn accept_incoming_connection( _self_: Arc>, @@ -241,6 +442,7 @@ impl JobDeclarator { status_tx: crate::status::Sender, mempool: Arc>, new_block_sender: Sender, + sender_add_txs_to_mempool: Sender, ) { let listner = TcpListener::bind(&config.listen_jd_address).await.unwrap(); while let Ok((stream, _)) = listner.accept().await { @@ -279,6 +481,8 @@ impl JobDeclarator { sender.clone(), &config, mempool.clone(), + // each downstream has its own sender (multi producer single consumer) + sender_add_txs_to_mempool.clone(), ))); JobDeclaratorDownstream::start( diff --git a/roles/jd-server/src/lib/mempool/mod.rs b/roles/jd-server/src/lib/mempool/mod.rs index aa990f095..43ffc7055 100644 --- a/roles/jd-server/src/lib/mempool/mod.rs +++ b/roles/jd-server/src/lib/mempool/mod.rs @@ -1,22 +1,23 @@ pub mod error; +use super::job_declarator::AddTrasactionsToMempoolInner; use crate::mempool::error::JdsMempoolError; use async_channel::Receiver; use bitcoin::blockdata::transaction::Transaction; use hashbrown::HashMap; use roles_logic_sv2::utils::Mutex; use rpc::mini_rpc_client; -use std::{convert::TryInto, sync::Arc}; +use std::{convert::TryInto, str::FromStr, sync::Arc}; use stratum_common::{bitcoin, bitcoin::hash_types::Txid}; #[derive(Clone, Debug)] -pub struct TransacrtionWithHash { - id: Txid, - tx: Transaction, +pub struct TransactionWithHash { + pub id: Txid, + pub tx: Option, } #[derive(Clone, Debug)] pub struct JDsMempool { - pub mempool: Vec, + pub mempool: HashMap>, auth: mini_rpc_client::Auth, url: String, new_block_receiver: Receiver, @@ -38,9 +39,10 @@ impl JDsMempool { #[cfg(debug_assertions)] pub fn _get_transaction_list(self_: Arc>) -> Vec { let tx_list = self_.safe_lock(|x| x.mempool.clone()).unwrap(); - let tx_list_: Vec = tx_list.iter().map(|n| n.id).collect(); + let tx_list_: Vec = tx_list.iter().map(|n| *n.0).collect(); tx_list_ } + pub fn new( url: String, username: String, @@ -48,7 +50,7 @@ impl JDsMempool { new_block_receiver: Receiver, ) -> Self { let auth = mini_rpc_client::Auth::new(username, password); - let empty_mempool: Vec = Vec::new(); + let empty_mempool: HashMap> = HashMap::new(); JDsMempool { mempool: empty_mempool, auth, @@ -57,24 +59,63 @@ impl JDsMempool { } } + // this functions fill in the mempool the transactions with the given txid and insert the given + // transactions. The ids are for the transactions that are already known to the node, the + // unknown transactions are provided directly as a vector + pub async fn add_tx_data_to_mempool( + self_: Arc>, + add_txs_to_mempool_inner: AddTrasactionsToMempoolInner, + ) -> Result<(), JdsMempoolError> { + let txids = add_txs_to_mempool_inner.known_transactions; + let transactions = add_txs_to_mempool_inner.unknown_transactions; + let client = self_ + .safe_lock(|a| a.get_client()) + .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? + .ok_or(JdsMempoolError::NoClient)?; + // fill in the mempool the transactions id in the mempool with the full transactions + // retrieved from the jd client + for txid in txids { + if let Some(None) = self_ + .safe_lock(|a| a.mempool.get(&txid).cloned()) + .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? + { + let transaction = client + .get_raw_transaction(&txid.to_string(), None) + .await + .map_err(JdsMempoolError::Rpc)?; + let _ = + self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + } + } + + // fill in the mempool the transactions given in input + for transaction in transactions { + let _ = self_.safe_lock(|a| a.mempool.insert(transaction.txid(), Some(transaction))); + } + Ok(()) + } + pub async fn update_mempool(self_: Arc>) -> Result<(), JdsMempoolError> { - let mut mempool_ordered: Vec = Vec::new(); + let mut mempool_ordered: HashMap> = HashMap::new(); let client = self_ .safe_lock(|x| x.get_client()) .map_err(|e| JdsMempoolError::PoisonLock(e.to_string()))? .ok_or(JdsMempoolError::NoClient)?; - let new_mempool: Result, JdsMempoolError> = + let new_mempool: Result>, JdsMempoolError> = { + let self_ = self_.clone(); tokio::task::spawn(async move { let mempool: Vec = client - .get_raw_mempool_verbose() + .get_raw_mempool() .await .map_err(JdsMempoolError::Rpc)?; for id in &mempool { - let tx: Result = client.get_raw_transaction(id, None).await; - if let Ok(tx) = tx { - let id = tx.txid(); - mempool_ordered.push(TransacrtionWithHash { id, tx }); - } + let key_id = Txid::from_str(id).unwrap(); + let tx = self_.safe_lock(|x| match x.mempool.get(&key_id) { + Some(entry) => entry.clone(), + None => None, + }); + let id = Txid::from_str(id).unwrap(); + mempool_ordered.insert(id, tx.unwrap()); } if mempool_ordered.is_empty() { Err(JdsMempoolError::EmptyMempool) @@ -83,8 +124,8 @@ impl JDsMempool { } }) .await - .map_err(JdsMempoolError::TokioJoin)?; - + .map_err(JdsMempoolError::TokioJoin)? + }; match new_mempool { Ok(new_mempool_) => { let _ = self_.safe_lock(|x| { @@ -114,14 +155,18 @@ impl JDsMempool { Ok(()) } - pub fn to_short_ids(&self, nonce: u64) -> Option> { + pub fn to_short_ids(&self, nonce: u64) -> Option> { let mut ret = HashMap::new(); for tx in &self.mempool { - let s_id = roles_logic_sv2::utils::get_short_hash(tx.id, nonce) + let s_id = roles_logic_sv2::utils::get_short_hash(*tx.0, nonce) .to_vec() .try_into() .unwrap(); - if ret.insert(s_id, tx.tx.clone()).is_none() { + let tx_data = TransactionWithHash { + id: *tx.0, + tx: tx.1.clone(), + }; + if ret.insert(s_id, tx_data.clone()).is_none() { continue; } else { return None; diff --git a/roles/jd-server/src/lib/status.rs b/roles/jd-server/src/lib/status.rs index b05294e47..83a50026f 100644 --- a/roles/jd-server/src/lib/status.rs +++ b/roles/jd-server/src/lib/status.rs @@ -121,5 +121,11 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error JdsError::MempoolError(_) => { send_status(sender, e, error_handling::ErrorBranch::Break).await } + JdsError::ImpossibleToReconstructBlock(_) => { + send_status(sender, e, error_handling::ErrorBranch::Continue).await + } + JdsError::NoLastDeclaredJob => { + send_status(sender, e, error_handling::ErrorBranch::Continue).await + } } } diff --git a/roles/jd-server/src/main.rs b/roles/jd-server/src/main.rs index abf8cc556..3391a93be 100644 --- a/roles/jd-server/src/main.rs +++ b/roles/jd-server/src/main.rs @@ -177,6 +177,7 @@ async fn main() { } } _ => { + // TODO here there should be a better error managmenet mempool::error::handle_error(&err); handle_result!(sender_submit_solution, Err(err)); } @@ -190,8 +191,38 @@ async fn main() { let cloned = config.clone(); let mempool_cloned = mempool.clone(); + let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded(); task::spawn(async move { - JobDeclarator::start(cloned, sender, mempool_cloned, new_block_sender).await + JobDeclarator::start( + cloned, + sender, + mempool_cloned, + new_block_sender, + sender_add_txs_to_mempool, + ) + .await + }); + task::spawn(async move { + loop { + if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await { + let mempool_cloned = mempool.clone(); + task::spawn(async move { + match lib::mempool::JDsMempool::add_tx_data_to_mempool( + mempool_cloned, + add_transactions_to_mempool, + ) + .await + { + Ok(_) => (), + Err(err) => { + // TODO + // here there should be a better error management + mempool::error::handle_error(&err); + } + } + }); + } + } }); // Start the error handling loop diff --git a/utils/rpc/src/mini_rpc_client.rs b/utils/rpc/src/mini_rpc_client.rs index 12769ac19..d0ce08435 100644 --- a/utils/rpc/src/mini_rpc_client.rs +++ b/utils/rpc/src/mini_rpc_client.rs @@ -59,7 +59,7 @@ impl MiniRpcClient { } } - pub async fn get_raw_mempool_verbose(&self) -> Result, RpcError> { + pub async fn get_raw_mempool(&self) -> Result, RpcError> { let response = self.send_json_rpc_request("getrawmempool", json!([])).await; match response { Ok(result_hex) => {