Skip to content

Commit

Permalink
Merge pull request #772 from GitGab19/jds-mempool-enhancement
Browse files Browse the repository at this point in the history
JDS mempool management enhancement
  • Loading branch information
GitGab19 authored Mar 21, 2024
2 parents 84bc8e0 + 909f865 commit db994da
Show file tree
Hide file tree
Showing 8 changed files with 451 additions and 135 deletions.
4 changes: 4 additions & 0 deletions protocols/v2/roles-logic-sv2/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum Error {
VersionTooBig,
TxVersionTooBig,
TxVersionTooLow,
TxDecodingError(String),
NotFoundChannelId,
NoValidJob,
NoValidTranslatorJob,
Expand All @@ -59,6 +60,7 @@ pub enum Error {
TargetError(InputError),
HashrateError(InputError),
LogicErrorMessage(std::boxed::Box<AllMessages<'static>>),
JDSMissingTransactions,
}

impl From<BinarySv2Error> for Error {
Expand Down Expand Up @@ -138,6 +140,7 @@ impl Display for Error {
VersionTooBig => write!(f, "We are trying to construct a block header with version bigger than i32::MAX"),
TxVersionTooBig => write!(f, "Tx version can not be greater than i32::MAX"),
TxVersionTooLow => write!(f, "Tx version can not be lower than 1"),
TxDecodingError(e) => write!(f, "Impossible to decode tx: {:?}", e),
NotFoundChannelId => write!(f, "No downstream has been registred for this channel id"),
NoValidJob => write!(f, "Impossible to create a standard job for channelA cause no valid job has been received from upstream yet"),
NoValidTranslatorJob => write!(f, "Impossible to create a extended job for channel cause no valid job has been received from upstream yet"),
Expand All @@ -149,6 +152,7 @@ impl Display for Error {
TargetError(e) => write!(f, "Impossible to get Target: {:?}", e),
HashrateError(e) => write!(f, "Impossible to get Hashrate: {:?}", e),
LogicErrorMessage(e) => write!(f, "Message is well formatted but can not be handled: {:?}", e),
JDSMissingTransactions => write!(f, "JD server cannot propagate the block: missing transactions"),
}
}
}
6 changes: 6 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub enum JdsError {
Custom(String),
Sv2ProtocolError((u32, Mining<'static>)),
MempoolError(JdsMempoolError),
ImpossibleToReconstructBlock(String),
NoLastDeclaredJob,
}

impl std::fmt::Display for JdsError {
Expand All @@ -42,6 +44,10 @@ impl std::fmt::Display for JdsError {
write!(f, "Received Sv2 Protocol Error from upstream: `{:?}`", e)
}
MempoolError(ref e) => write!(f, "Mempool error: `{:?}`", e),
ImpossibleToReconstructBlock(e) => {
write!(f, "Error in reconstructing the block: {:?}", e)
}
NoLastDeclaredJob => write!(f, "Last declared job not found"),
}
}
}
Expand Down
120 changes: 70 additions & 50 deletions roles/jd-server/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use roles_logic_sv2::{
parsers::JobDeclaration,
};
use std::{convert::TryInto, io::Cursor};
use stratum_common::bitcoin::Transaction;
use stratum_common::bitcoin::{Transaction, Txid};
pub type SendTo = SendTo_<JobDeclaration<'static>, ()>;
use super::{signed_token, TransactionState};
use roles_logic_sv2::{errors::Error, parsers::PoolMessages as AllMessages};
use stratum_common::bitcoin::consensus::Decodable;

use super::signed_token;
use tracing::info;

use super::JobDeclaratorDownstream;

Expand Down Expand Up @@ -53,14 +53,19 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
coinbase_output: self.coinbase_output.clone().try_into().unwrap(),
};
let message_enum = JobDeclaration::AllocateMiningJobTokenSuccess(message_success);
println!(
info!(
"Sending AllocateMiningJobTokenSuccess to proxy {:?}",
message_enum
);
Ok(SendTo::Respond(message_enum))
}

fn handle_declare_mining_job(&mut self, message: DeclareMiningJob) -> Result<SendTo, Error> {
// the transactions that are present in the mempool are stored here, that is sent to the
// mempool which use the rpc client to retrieve the whole data for each transaction.
// The unknown transactions is a vector that contains the transactions that are not in the
// jds mempool, and will be non-empty in the ProvideMissingTransactionsSuccess message
let mut known_transactions: Vec<Txid> = vec![];
self.tx_hash_list_hash = Some(message.tx_hash_list_hash.clone().into_static());
if self.verify_job(&message) {
let short_hash_list: Vec<ShortTxId> = message
Expand All @@ -76,22 +81,34 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
.safe_lock(|x| x.to_short_ids(nonce))
.unwrap()
.unwrap();
let mut txs_in_job = vec![];
let mut missing_txs = vec![];
let mut transactions_with_state =
vec![TransactionState::Missing; short_hash_list.len()];
let mut missing_txs: Vec<u16> = Vec::new();

for (i, sid) in short_hash_list.iter().enumerate() {
let sid_: [u8; 6] = sid.to_vec().try_into().unwrap();
if let Some(tx_data) = short_id_mempool.get(&sid_) {
txs_in_job.push(tx_data.clone());
} else {
missing_txs.push(i as u16);
match short_id_mempool.get(&sid_) {
Some(tx_data) => {
transactions_with_state[i] = TransactionState::PresentInMempool(tx_data.id);
known_transactions.push(tx_data.id);
}
None => {
transactions_with_state[i] = TransactionState::Missing;
missing_txs.push(i as u16);
}
}
}
self.declared_mining_job = Some((
message.clone().into_static(),
txs_in_job,
self.declared_mining_job = (
Some(message.clone().into_static()),
transactions_with_state,
missing_txs.clone(),
));
);
// here we send the transactions that we want to be stored in jds mempool with full data

self.add_txs_to_mempool
.add_txs_to_mempool_inner
.known_transactions
.append(&mut known_transactions);

if missing_txs.is_empty() {
let message_success = DeclareMiningJobSuccess {
Expand All @@ -113,7 +130,7 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
JobDeclaration::ProvideMissingTransactions(
message_provide_missing_transactions,
);
Ok(SendTo_::Respond(message_enum_provide_missing_transactions))
Ok(SendTo::Respond(message_enum_provide_missing_transactions))
}
} else {
let message_error = DeclareMiningJobError {
Expand All @@ -137,43 +154,46 @@ impl ParseClientJobDeclarationMessages for JobDeclaratorDownstream {
&mut self,
message: ProvideMissingTransactionsSuccess,
) -> Result<SendTo, Error> {
match &mut self.declared_mining_job {
Some((_, ref mut transactions, missing_indexes)) => {
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let tx = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.expect("Invalid tx data from downstream");
let index =
*missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(
JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
),
),
)))? as usize;
transactions.insert(index, tx);
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
let (_, ref mut transactions_with_state, missing_indexes) = &mut self.declared_mining_job;
let mut unknown_transactions: Vec<Transaction> = vec![];
for (i, tx) in message.transaction_list.inner_as_ref().iter().enumerate() {
let mut cursor = Cursor::new(tx);
let transaction = Transaction::consensus_decode_from_finite_reader(&mut cursor)
.map_err(|e| Error::TxDecodingError(e.to_string()))?;
Vec::push(&mut unknown_transactions, transaction.clone());
let index = *missing_indexes
.get(i)
.ok_or(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
)))? as usize;
// insert the missing transactions in the mempool
transactions_with_state[index] = TransactionState::PresentInMempool(transaction.txid());
}
self.add_txs_to_mempool
.add_txs_to_mempool_inner
.unknown_transactions
.append(&mut unknown_transactions);
// if there still a missing transaction return an error
for tx_with_state in transactions_with_state {
match tx_with_state {
TransactionState::PresentInMempool(_) => continue,
TransactionState::Missing => return Err(Error::JDSMissingTransactions),
}
None => Err(Error::LogicErrorMessage(Box::new(
AllMessages::JobDeclaration(JobDeclaration::ProvideMissingTransactionsSuccess(
message.clone().into_static(),
)),
))),
}
// TODO check it
let tx_hash_list_hash = self.tx_hash_list_hash.clone().unwrap().into_static();
let message_success = DeclareMiningJobSuccess {
request_id: message.request_id,
new_mining_job_token: signed_token(
tx_hash_list_hash,
&self.public_key.clone(),
&self.private_key.clone(),
),
};
let message_enum_success = JobDeclaration::DeclareMiningJobSuccess(message_success);
Ok(SendTo::Respond(message_enum_success))
}

fn handle_submit_solution(&mut self, message: SubmitSolutionJd<'_>) -> Result<SendTo, Error> {
Expand Down
Loading

0 comments on commit db994da

Please sign in to comment.