diff --git a/Cargo.lock b/Cargo.lock index 3091d59df82c7..700a6550b14ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13296,6 +13296,55 @@ dependencies = [ "tracing-gum", ] +[[package]] +name = "polkadot-node-core-approval-voting-parallel" +version = "7.0.0" +dependencies = [ + "assert_matches", + "async-trait", + "bitvec", + "derive_more", + "env_logger 0.11.3", + "futures", + "futures-timer", + "itertools 0.11.0", + "kvdb", + "kvdb-memorydb", + "log", + "merlin", + "parity-scale-codec", + "parking_lot 0.12.3", + "polkadot-approval-distribution", + "polkadot-node-core-approval-voting", + "polkadot-node-jaeger", + "polkadot-node-metrics", + "polkadot-node-network-protocol", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-overseer", + "polkadot-primitives", + "polkadot-primitives-test-helpers", + "polkadot-subsystem-bench", + "rand", + "rand_chacha", + "rand_core", + "sc-keystore", + "schnellru", + "schnorrkel 0.11.4", + "sp-application-crypto", + "sp-consensus", + "sp-consensus-babe", + "sp-consensus-slots", + "sp-core", + "sp-keyring", + "sp-keystore", + "sp-runtime", + "thiserror", + "tracing-gum", +] + [[package]] name = "polkadot-node-core-av-store" version = "7.0.0" @@ -14759,6 +14808,7 @@ dependencies = [ "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-approval-voting", + "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", @@ -14921,6 +14971,7 @@ dependencies = [ "polkadot-availability-recovery", "polkadot-erasure-coding", "polkadot-node-core-approval-voting", + "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-chain-api", "polkadot-node-metrics", diff --git a/Cargo.toml b/Cargo.toml index db9a2bd722735..9baeda27ea921 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -152,6 +152,7 @@ members = [ "polkadot/erasure-coding/fuzzer", "polkadot/node/collation-generation", "polkadot/node/core/approval-voting", + "polkadot/node/core/approval-voting-parallel", "polkadot/node/core/av-store", "polkadot/node/core/backing", "polkadot/node/core/bitfield-signing", @@ -1009,6 +1010,7 @@ polkadot-gossip-support = { path = "polkadot/node/network/gossip-support", defau polkadot-network-bridge = { path = "polkadot/node/network/bridge", default-features = false } polkadot-node-collation-generation = { path = "polkadot/node/collation-generation", default-features = false } polkadot-node-core-approval-voting = { path = "polkadot/node/core/approval-voting", default-features = false } +polkadot-node-core-approval-voting-parallel = { path = "polkadot/node/core/approval-voting-parallel", default-features = false } polkadot-node-core-av-store = { path = "polkadot/node/core/av-store", default-features = false } polkadot-node-core-backing = { path = "polkadot/node/core/backing", default-features = false } polkadot-node-core-bitfield-signing = { path = "polkadot/node/core/bitfield-signing", default-features = false } diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index 7871623e8447a..576b9d4d34850 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -327,6 +327,7 @@ fn build_polkadot_full_node( execute_workers_max_num: None, prepare_workers_hard_max_num: None, prepare_workers_soft_max_num: None, + enable_approval_voting_parallel: false, }, )?; diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index 3e5a6ccdd3c25..722521f11efa0 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -151,6 +151,12 @@ pub struct RunCmd { /// TESTING ONLY: disable the version check between nodes and workers. #[arg(long, hide = true)] pub disable_worker_version_check: bool, + + /// Enable approval-voting message processing in parallel. + /// + ///**Dangerous!** This is an experimental feature and should not be used in production. + #[arg(long)] + pub enable_approval_voting_parallel: bool, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index 276ee87516858..814ab759d6d7f 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -256,6 +256,7 @@ where execute_workers_max_num: cli.run.execute_workers_max_num, prepare_workers_hard_max_num: cli.run.prepare_workers_hard_max_num, prepare_workers_soft_max_num: cli.run.prepare_workers_soft_max_num, + enable_approval_voting_parallel: cli.run.enable_approval_voting_parallel, }, ) .map(|full| full.task_manager)?; diff --git a/polkadot/node/core/approval-voting-parallel/Cargo.toml b/polkadot/node/core/approval-voting-parallel/Cargo.toml new file mode 100644 index 0000000000000..10653cd218915 --- /dev/null +++ b/polkadot/node/core/approval-voting-parallel/Cargo.toml @@ -0,0 +1,67 @@ +[package] +name = "polkadot-node-core-approval-voting-parallel" +version = "7.0.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +description = "Approval Voting Subsystem running approval work in parallel" + +[lints] +workspace = true + +[dependencies] +futures = "0.3.30" +futures-timer = "3.0.2" +codec = { package = "parity-scale-codec", version = "3.6.12", default-features = false, features = ["bit-vec", "derive"] } +gum = { package = "tracing-gum", path = "../../gum" } +bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] } +schnellru = "0.2.1" +merlin = "3.0" +schnorrkel = "0.11.4" +kvdb = "0.13.0" +derive_more = "0.99.17" +thiserror = { workspace = true } +itertools = "0.11" +async-trait = { workspace = true } + +polkadot-node-core-approval-voting = { workspace = true, default-features = true } +polkadot-approval-distribution = { workspace = true, default-features = true } + + +polkadot-node-subsystem = { workspace = true, default-features = true } +polkadot-node-subsystem-util = { workspace = true, default-features = true } +polkadot-overseer = { workspace = true, default-features = true } +polkadot-primitives = { workspace = true, default-features = true } +polkadot-node-primitives = { workspace = true, default-features = true } +polkadot-node-jaeger = { workspace = true, default-features = true } + +sc-keystore = { workspace = true, default-features = false } +sp-consensus = { workspace = true, default-features = false } +sp-consensus-slots = { workspace = true, default-features = false } +sp-application-crypto = { workspace = true, default-features = false, features = ["full_crypto"] } +sp-runtime = { workspace = true, default-features = false } +polkadot-node-network-protocol = { workspace = true, default-features = true } +polkadot-node-metrics = { workspace = true, default-features = true} + +rand = "0.8.5" + +# rand_core should match schnorrkel +rand_core = "0.6.2" +rand_chacha = { version = "0.3.1" } + +[dev-dependencies] +async-trait = "0.1.79" +parking_lot = "0.12.1" +sp-keyring = { workspace = true, default-features = true } +sp-keystore = {workspace = true, default-features = true} +sp-core = { workspace = true, default-features = true} +sp-consensus-babe = { workspace = true, default-features = true } +polkadot-node-subsystem-test-helpers = { workspace = true, default-features = true} +assert_matches = "1.4.0" +kvdb-memorydb = "0.13.0" +polkadot-primitives-test-helpers = { workspace = true, default-features = true } +log = { workspace = true, default-features = true } +env_logger = "0.11" + +polkadot-subsystem-bench = { workspace = true, default-features = true} + diff --git a/polkadot/node/core/approval-voting-parallel/src/lib.rs b/polkadot/node/core/approval-voting-parallel/src/lib.rs new file mode 100644 index 0000000000000..3142694e1af64 --- /dev/null +++ b/polkadot/node/core/approval-voting-parallel/src/lib.rs @@ -0,0 +1,747 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The Approval Voting Parallel Subsystem. +//! +//! This subsystem is responsible for orchestrating the work done by +//! approval-voting and approval-distribution subsystem, so they can +//! do their work in parallel, rather than serially, when they are run +//! as independent subsystems. +use itertools::Itertools; +use polkadot_node_core_approval_voting::{ + time::{Clock, SystemClock}, + Config, RealAssignmentCriteria, +}; +use polkadot_node_metrics::metered::{ + self, channel, unbounded, MeteredSender, UnboundedMeteredSender, +}; + +use polkadot_node_primitives::DISPUTE_WINDOW; +use polkadot_node_subsystem::{ + messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage}, + overseer, FromOrchestra, SpawnedSubsystem, SubsystemError, SubsystemResult, +}; +use polkadot_node_subsystem_util::{ + self, + database::Database, + metrics::{self, prometheus}, + runtime::{Config as RuntimeInfoConfig, RuntimeInfo}, +}; +use polkadot_overseer::{OverseerSignal, SubsystemSender}; +use polkadot_primitives::ValidatorIndex; +use rand::SeedableRng; + +use sc_keystore::LocalKeystore; +use sp_consensus::SyncOracle; + +use futures::{channel::oneshot, prelude::*, StreamExt}; +use polkadot_node_core_approval_voting::{ + approval_db::common::Config as DatabaseConfig, ApprovalVotingWorkProvider, +}; +use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use stream::{select_with_strategy, PollNext}; + +pub(crate) const LOG_TARGET: &str = "parachain::approval-voting-parallel"; + +/// The approval voting subsystem. +pub struct ApprovalVotingParallelSubsystem { + /// `LocalKeystore` is needed for assignment keys, but not necessarily approval keys. + /// + /// We do a lot of VRF signing and need the keys to have low latency. + keystore: Arc, + db_config: DatabaseConfig, + slot_duration_millis: u64, + db: Arc, + sync_oracle: Box, + metrics: Metrics, + spawner: Arc, + clock: Arc, + subsystem_enabled: bool, +} + +/// Approval Voting metrics. +#[derive(Default, Clone)] +pub struct Metrics( + pub polkadot_approval_distribution::metrics::Metrics, + pub polkadot_node_core_approval_voting::Metrics, +); + +impl metrics::Metrics for Metrics { + fn try_register( + registry: &prometheus::Registry, + ) -> std::result::Result { + Ok(Metrics( + polkadot_approval_distribution::metrics::Metrics::try_register(registry)?, + polkadot_node_core_approval_voting::Metrics::try_register(registry)?, + )) + } +} + +impl ApprovalVotingParallelSubsystem { + /// Create a new approval voting subsystem with the given keystore, config, and database. + pub fn with_config( + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + spawner: impl overseer::gen::Spawner + 'static + Clone, + subsystem_enabled: bool, + ) -> Self { + ApprovalVotingParallelSubsystem::with_config_and_clock( + config, + db, + keystore, + sync_oracle, + metrics, + Arc::new(SystemClock {}), + spawner, + subsystem_enabled, + ) + } + + /// Create a new approval voting subsystem with the given keystore, config, and database. + pub fn with_config_and_clock( + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + clock: Arc, + spawner: impl overseer::gen::Spawner + 'static, + subsystem_enabled: bool, + ) -> Self { + ApprovalVotingParallelSubsystem { + keystore, + slot_duration_millis: config.slot_duration_millis, + db, + db_config: DatabaseConfig { col_approval_data: config.col_approval_data }, + sync_oracle, + metrics, + spawner: Arc::new(spawner), + clock, + subsystem_enabled, + } + } +} + +#[overseer::subsystem(ApprovalVotingParallel, error = SubsystemError, prefix = self::overseer)] +impl ApprovalVotingParallelSubsystem { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = run::(ctx, self) + .map_err(|e| SubsystemError::with_origin("approval-voting-parallel", e)) + .boxed(); + + SpawnedSubsystem { name: "approval-voting-parallel-subsystem", future } + } +} + +/// The number of workers used for running the approval-distribution logic. +pub const APPROVAL_DISTRIBUTION_WORKER_COUNT: usize = 2; + +/// The channel size for the workers. +pub const WORKERS_CHANNEL_SIZE: usize = 64000 / APPROVAL_DISTRIBUTION_WORKER_COUNT; +fn prio_right<'a>(_val: &'a mut ()) -> PollNext { + PollNext::Right +} + +#[overseer::contextbounds(ApprovalVotingParallel, prefix = self::overseer)] +async fn run( + mut ctx: Context, + subsystem: ApprovalVotingParallelSubsystem, +) -> SubsystemResult<()> +where +{ + // Build approval voting handles. + let (tx_approval_voting_work, rx_approval_voting_work) = + channel::>(WORKERS_CHANNEL_SIZE); + let (tx_approval_voting_work_unbounded, rx_approval_voting_work_unbounded) = + unbounded::>(); + + let mut to_approval_voting_worker = + ToWorker(tx_approval_voting_work, tx_approval_voting_work_unbounded); + + let prioritised = select_with_strategy( + rx_approval_voting_work, + rx_approval_voting_work_unbounded, + prio_right, + ); + let approval_voting_work_provider = ApprovalVotingWorkProviderImpl(prioritised); + + gum::info!(target: LOG_TARGET, "Starting approval distribution workers"); + + let mut approval_distribution_channels = Vec::new(); + let slot_duration_millis = subsystem.slot_duration_millis; + + for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT { + let approval_distro_orig = + polkadot_approval_distribution::ApprovalDistribution::new_with_clock( + subsystem.metrics.0.clone(), + subsystem.slot_duration_millis, + subsystem.clock.clone(), + false, + ); + + let (tx_approval_distribution_work, rx_approval_distribution_work) = + channel::>(WORKERS_CHANNEL_SIZE); + let (tx_approval_distribution_work_unbounded, rx_approval_distribution_unbounded) = + unbounded::>(); + + let to_approval_distribution_worker = + ToWorker(tx_approval_distribution_work, tx_approval_distribution_work_unbounded); + + let task_name = format!("approval-voting-parallel-{}", i); + + let mut network_sender = ctx.sender().clone(); + let mut runtime_api_sender = ctx.sender().clone(); + let mut approval_distribution_to_approval_voting = to_approval_voting_worker.clone(); + + subsystem.spawner.spawn_blocking( + task_name.leak(), + Some("approval-voting-parallel-subsystem"), + Box::pin(async move { + let mut state = + polkadot_approval_distribution::State::with_config(slot_duration_millis); + let mut rng = rand::rngs::StdRng::from_entropy(); + let assignment_criteria = RealAssignmentCriteria {}; + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: DISPUTE_WINDOW.get(), + }); + + let mut work_channels = select_with_strategy( + rx_approval_distribution_work, + rx_approval_distribution_unbounded, + prio_right, + ); + + loop { + let message = match work_channels.next().await { + Some(message) => message, + None => { + gum::info!( + target: LOG_TARGET, + "Approval distribution stream finished, most likely shutting down", + ); + break; + }, + }; + approval_distro_orig + .handle_from_orchestra( + message, + &mut approval_distribution_to_approval_voting, + &mut network_sender, + &mut runtime_api_sender, + &mut state, + &mut rng, + &assignment_criteria, + &mut session_info_provider, + ) + .await; + } + }), + ); + approval_distribution_channels.push(to_approval_distribution_worker); + } + + gum::info!(target: LOG_TARGET, "Starting approval voting workers"); + let sender = ctx.sender().clone(); + let to_approval_distribution = ApprovalVotingToApprovalDistribution(sender.clone()); + + polkadot_node_core_approval_voting::start_approval_worker( + approval_voting_work_provider, + sender.clone(), + to_approval_distribution, + polkadot_node_core_approval_voting::Config { + slot_duration_millis: subsystem.slot_duration_millis, + col_approval_data: subsystem.db_config.col_approval_data, + }, + subsystem.db.clone(), + subsystem.keystore.clone(), + subsystem.sync_oracle, + subsystem.metrics.1.clone(), + subsystem.spawner.clone(), + subsystem.clock.clone(), + ) + .await?; + + gum::info!( + target: LOG_TARGET, + subsystem_disabled = ?subsystem.subsystem_enabled, + "Starting main subsystem loop" + ); + + // Main loop of the subsystem, it shouldn't include any logic just dispatching of messages to + // the workers. + loop { + futures::select! { + next_msg = ctx.recv().fuse() => { + let next_msg = match next_msg { + Ok(msg) => msg, + Err(err) => { + gum::info!(target: LOG_TARGET, ?err, "Approval voting parallel subsystem received an error"); + break; + } + }; + if !subsystem.subsystem_enabled { + gum::trace!(target: LOG_TARGET, ?next_msg, "Parallel processing is not enabled skipping message"); + continue; + } + gum::trace!(target: LOG_TARGET, ?next_msg, "Parallel processing is not enabled skipping message"); + + match next_msg { + FromOrchestra::Signal(msg) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send_signal(msg.clone()).await?; + } + + to_approval_voting_worker.send_signal(msg).await?; + }, + FromOrchestra::Communication { msg } => match msg { + // The message the approval voting subsystem would've handled. + ApprovalVotingParallelMessage::CheckAndImportAssignment(_,_, _) | + ApprovalVotingParallelMessage::CheckAndImportApproval(_)| + ApprovalVotingParallelMessage::ApprovedAncestor(_, _,_) | + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(_, _) => { + // Safe to unwrap because we know the message is of the right type. + to_approval_voting_worker.send_message(msg.try_into().unwrap()).await; + }, + // Now the message the approval distribution subsystem would've handled and need to + // be forwarded to the workers. + ApprovalVotingParallelMessage::NewBlocks(msg) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send_message( + ApprovalDistributionMessage::NewBlocks(msg.clone()), + ) + .await; + } + }, + ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed) => { + let worker_index = assignment.validator.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).expect("Worker index is obtained modulo len; qed"); + worker + .send_message( + ApprovalDistributionMessage::DistributeAssignment(assignment, claimed) + ) + .await; + + }, + ApprovalVotingParallelMessage::DistributeApproval(vote) => { + let worker_index = vote.validator.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).expect("Worker index is obtained modulo len; qed"); + worker + .send_message( + ApprovalDistributionMessage::DistributeApproval(vote) + ).await; + + }, + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => { + if let polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage( + peer_id, + msg, + ) = msg + { + let (all_msgs_from_same_validator, messages_split_by_validator) = validator_index_for_msg(msg); + + for (validator_index, msg) in all_msgs_from_same_validator.into_iter().chain(messages_split_by_validator.into_iter().flatten()) { + let worker_index = validator_index.0 as usize % approval_distribution_channels.len(); + let worker = approval_distribution_channels.get_mut(worker_index).expect("Worker index is obtained modulo len; qed"); + + worker + .send_message( + ApprovalDistributionMessage::NetworkBridgeUpdate( + polkadot_node_subsystem::messages::NetworkBridgeEvent::PeerMessage( + peer_id, msg, + ), + ), + ).await; + } + } else { + for worker in approval_distribution_channels.iter_mut() { + worker + .send_message( + ApprovalDistributionMessage::NetworkBridgeUpdate(msg.clone()), + ).await; + } + } + }, + ApprovalVotingParallelMessage::GetApprovalSignatures(indices, tx) => { + let mut sigs = HashMap::new(); + let mut signatures_channels = Vec::new(); + for worker in approval_distribution_channels.iter_mut() { + let (tx, rx) = oneshot::channel(); + worker + .send_message( + ApprovalDistributionMessage::GetApprovalSignatures(indices.clone(), tx) + ).await; + signatures_channels.push(rx); + } + let results = futures::future::join_all(signatures_channels).await; + + for result in results { + let worker_sigs = match result { + Ok(sigs) => sigs, + Err(_) => { + gum::error!( + target: LOG_TARGET, + "Getting approval signatures failed, oneshot got closed" + ); + continue; + }, + }; + sigs.extend(worker_sigs); + } + + if let Err(_) = tx.send(sigs) { + gum::debug!( + target: LOG_TARGET, + "Sending back approval signatures failed, oneshot got closed" + ); + } + }, + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => { + for worker in approval_distribution_channels.iter_mut() { + worker + .send_message( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) + ).await; + } + }, + }, + }; + + }, + }; + } + Ok(()) +} + +// Returns the validators that initially created this assignments/votes, the validator index +// is later used to decide which approval-distribution worker should receive the message. +// +// Because this is on the hot path and we don't want to be unnecessarily slow, it contains two logic +// paths. The ultra fast path where all messages have the same validator index and we don't don't do +// any cloning or allocation and the path where we need to split the messages into multiple +// messages, because they have different validator indices, where we do need to clone and allocate. +// In practice most of the message will fall on the ultra fast path. +fn validator_index_for_msg( + msg: polkadot_node_network_protocol::ApprovalDistributionMessage, +) -> ( + Option<(ValidatorIndex, polkadot_node_network_protocol::ApprovalDistributionMessage)>, + Option>, +) { + match msg { + polkadot_node_network_protocol::Versioned::V1(ref message) => match message { + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Assignments(msgs) => + if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|(msg, claimed_candidates)| { + ( + msg.validator, + polkadot_node_network_protocol::Versioned::V1( + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Assignments( + vec![(msg.clone(), *claimed_candidates)] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Approvals(msgs) => + if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|vote| { + ( + vote.validator, + polkadot_node_network_protocol::Versioned::V1( + polkadot_node_network_protocol::v1::ApprovalDistributionMessage::Approvals( + vec![vote.clone()] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + }, + polkadot_node_network_protocol::Versioned::V2(ref message) => match message { + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Assignments(msgs) => + if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|(msg, claimed_candidates)| { + ( + msg.validator, + polkadot_node_network_protocol::Versioned::V2( + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Assignments( + vec![(msg.clone(), *claimed_candidates)] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Approvals(msgs) => + if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|vote| { + ( + vote.validator, + polkadot_node_network_protocol::Versioned::V2( + polkadot_node_network_protocol::v2::ApprovalDistributionMessage::Approvals( + vec![vote.clone()] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + }, + polkadot_node_network_protocol::Versioned::V3(ref message) => match message { + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments(msgs) => + if let Ok(validator) = msgs.iter().map(|(msg, _)| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|(msg, claimed_candidates)| { + ( + msg.validator, + polkadot_node_network_protocol::Versioned::V3( + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Assignments( + vec![(msg.clone(), claimed_candidates.clone())] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals(msgs) => + if let Ok(validator) = msgs.iter().map(|msg| msg.validator).all_equal_value() { + (Some((validator, msg)), None) + } else { + let split = msgs + .iter() + .map(|vote| { + ( + vote.validator, + polkadot_node_network_protocol::Versioned::V3( + polkadot_node_network_protocol::v3::ApprovalDistributionMessage::Approvals( + vec![vote.clone()] + ), + ), + ) + }) + .collect_vec(); + (None, Some(split)) + }, + }, + } +} + +/// Just a wrapper over a channel Receiver, that is injected into approval-voting worker for +/// providing the messages to be processed. +pub struct ApprovalVotingWorkProviderImpl(T); + +#[async_trait::async_trait] +impl ApprovalVotingWorkProvider for ApprovalVotingWorkProviderImpl +where + T: Stream> + Unpin + Send, +{ + async fn recv(&mut self) -> SubsystemResult> { + self.0.next().await.ok_or(SubsystemError::Context( + "ApprovalVotingWorkProviderImpl: Channel closed".to_string(), + )) + } +} + +/// Just a wrapper for implementing overseer::SubsystemSender and +/// overseer::SubsystemSender, so that we can inject into the +/// workers, so they can talke directly with each other without intermediating in this subsystem +/// loop. +pub struct ToWorker( + MeteredSender>, + UnboundedMeteredSender>, +); + +impl Clone for ToWorker { + fn clone(&self) -> Self { + Self(self.0.clone(), self.1.clone()) + } +} + +impl ToWorker { + async fn send_signal(&mut self, signal: OverseerSignal) -> Result<(), SubsystemError> { + self.1 + .unbounded_send(FromOrchestra::Signal(signal)) + .map_err(|err| SubsystemError::QueueError(err.into_send_error())) + } +} + +impl overseer::SubsystemSender for ToWorker { + fn send_message<'life0, 'async_trait>( + &'life0 mut self, + msg: T, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + async { + if let Err(err) = + self.0.send(polkadot_overseer::FromOrchestra::Communication { msg }).await + { + gum::error!( + target: LOG_TARGET, + "Failed to send message to approval voting worker: {:?}, subsystem is probably shutting down.", + err + ); + } + } + .boxed() + } + + fn try_send_message(&mut self, msg: T) -> Result<(), metered::TrySendError> { + self.0 + .try_send(polkadot_overseer::FromOrchestra::Communication { msg }) + .map_err(|result| { + let is_full = result.is_full(); + let msg = match result.into_inner() { + polkadot_overseer::FromOrchestra::Signal(_) => + panic!("Cannot happen variant is never built"), + polkadot_overseer::FromOrchestra::Communication { msg } => msg, + }; + if is_full { + metered::TrySendError::Full(msg) + } else { + metered::TrySendError::Closed(msg) + } + }) + } + + fn send_messages<'life0, 'async_trait, I>( + &'life0 mut self, + msgs: I, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + I: IntoIterator + Send, + I::IntoIter: Send, + I: 'async_trait, + 'life0: 'async_trait, + Self: 'async_trait, + { + async { + for msg in msgs { + self.send_message(msg).await; + } + } + .boxed() + } + + fn send_unbounded_message(&mut self, msg: T) { + if let Err(err) = + self.1.unbounded_send(polkadot_overseer::FromOrchestra::Communication { msg }) + { + gum::error!( + target: LOG_TARGET, + "Failed to send unbounded message to approval voting worker: {:?}, subsystem is probably shutting down.", + err + ); + } + } +} + +/// Just a wrapper for implementing overseer::SubsystemSender, so that +/// we can inject into the approval voting subsystem. +#[derive(Clone)] +pub struct ApprovalVotingToApprovalDistribution>( + S, +); + +impl> + overseer::SubsystemSender + for ApprovalVotingToApprovalDistribution +{ + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + fn send_message<'life0, 'async_trait>( + &'life0 mut self, + msg: ApprovalDistributionMessage, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + self.0.send_message(msg.into()) + } + + fn try_send_message( + &mut self, + msg: ApprovalDistributionMessage, + ) -> Result<(), metered::TrySendError> { + self.0.try_send_message(msg.into()).map_err(|err| match err { + // Safe to unwrap because it was built from the same type. + metered::TrySendError::Closed(msg) => + metered::TrySendError::Closed(msg.try_into().unwrap()), + metered::TrySendError::Full(msg) => + metered::TrySendError::Full(msg.try_into().unwrap()), + }) + } + + #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] + fn send_messages<'life0, 'async_trait, I>( + &'life0 mut self, + msgs: I, + ) -> ::core::pin::Pin< + Box + ::core::marker::Send + 'async_trait>, + > + where + I: IntoIterator + Send, + I::IntoIter: Send, + I: 'async_trait, + 'life0: 'async_trait, + Self: 'async_trait, + { + self.0.send_messages(msgs.into_iter().map(|msg| msg.into())) + } + + fn send_unbounded_message(&mut self, msg: ApprovalDistributionMessage) { + self.0.send_unbounded_message(msg.into()) + } +} diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 9ab16ba1cd6b1..6c82946728262 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -41,7 +41,7 @@ use polkadot_node_subsystem::{ ApprovalDistributionMessage, ChainApiMessage, ChainSelectionMessage, RuntimeApiMessage, RuntimeApiRequest, }, - overseer, RuntimeApiError, SubsystemError, SubsystemResult, + overseer, RuntimeApiError, SubsystemContext, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo}; use polkadot_overseer::SubsystemSender; diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 4195a1fe490d5..45ac823fd1a26 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -166,6 +166,7 @@ pub struct ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + subsystem_disabled: bool, } #[derive(Clone)] @@ -485,6 +486,7 @@ impl ApprovalVotingSubsystem { sync_oracle: Box, metrics: Metrics, spawner: Arc, + subsystem_disabled: bool, ) -> Self { ApprovalVotingSubsystem::with_config_and_clock( config, @@ -494,6 +496,7 @@ impl ApprovalVotingSubsystem { metrics, Arc::new(SystemClock {}), spawner, + subsystem_disabled, ) } @@ -506,6 +509,7 @@ impl ApprovalVotingSubsystem { metrics: Metrics, clock: Arc, spawner: Arc, + subsystem_disabled: bool, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -516,6 +520,7 @@ impl ApprovalVotingSubsystem { metrics, clock, spawner, + subsystem_disabled, } } @@ -1251,6 +1256,12 @@ where ).await? } next_msg = work_provider.recv().fuse() => { + if subsystem.subsystem_disabled { + // If we are running in parallel mode, we need to ensure that we are not + // processing messages, but also consume the messages, so that the system + // is not marked as stalled because of the signals it receives. + continue; + } let mut actions = handle_from_overseer( &mut to_other_subsystems, &mut to_approval_distr, @@ -1397,12 +1408,13 @@ pub async fn start_approval_worker< metrics, clock, spawner, + false, ); let backend = DbBackend::new(db.clone(), approval_voting.db_config); let spawner = approval_voting.spawner.clone(); spawner.spawn_blocking( - "approval-voting-rewrite-db", - Some("approval-voting-rewrite-subsystem"), + "approval-voting-parallel-db", + Some("approval-voting-parallel-subsystem"), Box::pin(async move { if let Err(err) = run( work_provider, diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 8565e5d14a1ae..3a15da7c3989b 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -35,7 +35,7 @@ use polkadot_node_subsystem::{ messages::{ AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage, }, - ActiveLeavesUpdate, + ActiveLeavesUpdate, SubsystemContext, }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; @@ -555,7 +555,7 @@ fn test_harness>( config; let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = + let (mut context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); let keystore = LocalKeystore::in_memory(); @@ -567,9 +567,12 @@ fn test_harness>( let clock = Arc::new(clock); let db = kvdb_memorydb::create(test_constants::NUM_COLUMNS); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); - + let sender = context.sender().clone(); + let to_approval_distr_sender = context.sender().clone(); let subsystem = run( context, + sender, + to_approval_distr_sender, ApprovalVotingSubsystem::with_config_and_clock( Config { col_approval_data: test_constants::TEST_CONFIG.col_approval_data, @@ -581,6 +584,7 @@ fn test_harness>( Metrics::default(), clock.clone(), Arc::new(SpawnGlue(pool)), + false, ), assignment_criteria, backend, diff --git a/polkadot/node/core/dispute-coordinator/src/initialized.rs b/polkadot/node/core/dispute-coordinator/src/initialized.rs index 5f86da87f21ca..0d9da012712dd 100644 --- a/polkadot/node/core/dispute-coordinator/src/initialized.rs +++ b/polkadot/node/core/dispute-coordinator/src/initialized.rs @@ -34,8 +34,9 @@ use polkadot_node_primitives::{ }; use polkadot_node_subsystem::{ messages::{ - ApprovalVotingMessage, BlockDescription, ChainSelectionMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, ImportStatementsResult, + ApprovalVotingMessage, ApprovalVotingParallelMessage, BlockDescription, + ChainSelectionMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, + ImportStatementsResult, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, }; @@ -117,6 +118,7 @@ pub(crate) struct Initialized { /// `CHAIN_IMPORT_MAX_BATCH_SIZE` and put the rest here for later processing. chain_import_backlog: VecDeque, metrics: Metrics, + approval_voting_parallel_enabled: bool, } #[overseer::contextbounds(DisputeCoordinator, prefix = self::overseer)] @@ -130,7 +132,13 @@ impl Initialized { highest_session_seen: SessionIndex, gaps_in_cache: bool, ) -> Self { - let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; + let DisputeCoordinatorSubsystem { + config: _, + store: _, + keystore, + metrics, + approval_voting_parallel_enabled, + } = subsystem; let (participation_sender, participation_receiver) = mpsc::channel(1); let participation = Participation::new(participation_sender, metrics.clone()); @@ -148,6 +156,7 @@ impl Initialized { participation_receiver, chain_import_backlog: VecDeque::new(), metrics, + approval_voting_parallel_enabled, } } @@ -1059,9 +1068,21 @@ impl Initialized { // 4. We are waiting (and blocking the whole subsystem) on a response right after - // therefore even with all else failing we will never have more than // one message in flight at any given time. - ctx.send_unbounded_message( - ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx), - ); + if self.approval_voting_parallel_enabled { + ctx.send_unbounded_message( + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate( + candidate_hash, + tx, + ), + ); + } else { + ctx.send_unbounded_message( + ApprovalVotingMessage::GetApprovalSignaturesForCandidate( + candidate_hash, + tx, + ), + ); + } match rx.await { Err(_) => { gum::warn!( diff --git a/polkadot/node/core/dispute-coordinator/src/lib.rs b/polkadot/node/core/dispute-coordinator/src/lib.rs index daa384b36ffba..fcac596f4b8fc 100644 --- a/polkadot/node/core/dispute-coordinator/src/lib.rs +++ b/polkadot/node/core/dispute-coordinator/src/lib.rs @@ -34,7 +34,7 @@ use gum::CandidateHash; use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, + approval, CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ @@ -122,6 +122,7 @@ pub struct DisputeCoordinatorSubsystem { store: Arc, keystore: Arc, metrics: Metrics, + approval_voting_parallel_enabled: bool, } /// Configuration for the dispute coordinator subsystem. @@ -164,8 +165,9 @@ impl DisputeCoordinatorSubsystem { config: Config, keystore: Arc, metrics: Metrics, + approval_voting_parallel_enabled: bool, ) -> Self { - Self { store, config, keystore, metrics } + Self { store, config, keystore, metrics, approval_voting_parallel_enabled } } /// Initialize and afterwards run `Initialized::run`. diff --git a/polkadot/node/core/dispute-coordinator/src/tests.rs b/polkadot/node/core/dispute-coordinator/src/tests.rs index f97a625a9528e..b41cdb94b4d29 100644 --- a/polkadot/node/core/dispute-coordinator/src/tests.rs +++ b/polkadot/node/core/dispute-coordinator/src/tests.rs @@ -580,6 +580,7 @@ impl TestState { self.config, self.subsystem_keystore.clone(), Metrics::default(), + false, ); let backend = DbBackend::new(self.db.clone(), self.config.column_config(), Metrics::default()); diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index 33136c1b864d4..a7bd016887f2b 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -71,10 +71,12 @@ use polkadot_primitives::{ use rand::{CryptoRng, Rng, SeedableRng}; use std::{ collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque}, + sync::Arc, time::Duration, }; -mod metrics; +/// Approval distribution metrics. +pub mod metrics; #[cfg(test)] mod tests; @@ -100,7 +102,8 @@ const MAX_BITFIELD_SIZE: usize = 500; pub struct ApprovalDistribution { metrics: Metrics, slot_duration_millis: u64, - clock: Box, + clock: Arc, + subsystem_disabled: bool, } /// Contains recently finalized @@ -2643,17 +2646,23 @@ async fn modify_reputation( #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] impl ApprovalDistribution { /// Create a new instance of the [`ApprovalDistribution`] subsystem. - pub fn new(metrics: Metrics, slot_duration_millis: u64) -> Self { - Self::new_with_clock(metrics, slot_duration_millis, Box::new(SystemClock)) + pub fn new(metrics: Metrics, slot_duration_millis: u64, subsystem_disabled: bool) -> Self { + Self::new_with_clock( + metrics, + slot_duration_millis, + Arc::new(SystemClock), + subsystem_disabled, + ) } /// Create a new instance of the [`ApprovalDistribution`] subsystem, with a custom clock. pub fn new_with_clock( metrics: Metrics, slot_duration_millis: u64, - clock: Box, + clock: Arc, + subsystem_disabled: bool, ) -> Self { - Self { metrics, slot_duration_millis, clock } + Self { metrics, slot_duration_millis, clock, subsystem_disabled } } async fn run(self, ctx: Context) { @@ -2702,6 +2711,10 @@ impl ApprovalDistribution { reputation_delay = new_reputation_delay(); }, message = ctx.recv().fuse() => { + if self.subsystem_disabled { + gum::trace!(target: LOG_TARGET, "Approval voting parallel is enabled skipping messages"); + continue; + } let message = match message { Ok(message) => message, Err(e) => { diff --git a/polkadot/node/network/approval-distribution/src/tests.rs b/polkadot/node/network/approval-distribution/src/tests.rs index 3f926746449dc..5c32e67aa80f3 100644 --- a/polkadot/node/network/approval-distribution/src/tests.rs +++ b/polkadot/node/network/approval-distribution/src/tests.rs @@ -54,7 +54,7 @@ type VirtualOverseer = fn test_harness>( assignment_criteria: &impl AssignmentCriteria, - clock: Box, + clock: Arc, mut state: State, test_fn: impl FnOnce(VirtualOverseer) -> T, ) -> State { @@ -68,7 +68,7 @@ fn test_harness>( polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); let subsystem = - ApprovalDistribution::new_with_clock(Metrics::default(), Default::default(), clock); + ApprovalDistribution::new_with_clock(Metrics::default(), Default::default(), clock, false); { let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345); let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { @@ -523,7 +523,7 @@ fn try_import_the_same_assignment() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -631,7 +631,7 @@ fn try_import_the_same_assignment_v2() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -744,7 +744,7 @@ fn delay_reputation_change() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_with_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -819,7 +819,7 @@ fn spam_attack_results_in_negative_reputation_change() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -918,7 +918,7 @@ fn peer_sending_us_the_same_we_just_sent_them_is_ok() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1019,7 +1019,7 @@ fn import_approval_happy_path_v1_v2_peers() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1159,7 +1159,7 @@ fn import_approval_happy_path_v2() { let candidate_hash_second = polkadot_primitives::CandidateHash(Hash::repeat_byte(0xCC)); let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1290,7 +1290,7 @@ fn multiple_assignments_covered_with_one_approval_vote() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1502,7 +1502,7 @@ fn unify_with_peer_multiple_assignments_covered_with_one_approval_vote() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1703,7 +1703,7 @@ fn import_approval_bad() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1791,7 +1791,7 @@ fn update_our_view() { let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1839,7 +1839,7 @@ fn update_our_view() { let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1858,7 +1858,7 @@ fn update_our_view() { let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1886,7 +1886,7 @@ fn update_peer_view() { let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -1985,7 +1985,7 @@ fn update_peer_view() { let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2045,7 +2045,7 @@ fn update_peer_view() { let finalized_number = 4_000_000_000; let state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2087,7 +2087,7 @@ fn update_peer_authority_id() { let _state = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2268,7 +2268,7 @@ fn import_remotely_then_locally() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2375,7 +2375,7 @@ fn sends_assignments_even_when_state_is_approved() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2481,7 +2481,7 @@ fn sends_assignments_even_when_state_is_approved_v2() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2607,7 +2607,7 @@ fn race_condition_in_local_vs_remote_view_update() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2694,7 +2694,7 @@ fn propagates_locally_generated_assignment_to_both_dimensions() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2824,7 +2824,7 @@ fn propagates_assignments_along_unshared_dimension() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -2987,7 +2987,7 @@ fn propagates_to_required_after_connect() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3152,7 +3152,7 @@ fn sends_to_more_peers_after_getting_topology() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), State::default(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3290,7 +3290,7 @@ fn originator_aggression_l1() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3471,7 +3471,7 @@ fn non_originator_aggression_l1() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3598,7 +3598,7 @@ fn non_originator_aggression_l2() { let aggression_l2_threshold = state.aggression_config.l2_threshold.unwrap(); let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3785,7 +3785,7 @@ fn resends_messages_periodically() { state.aggression_config.resend_unfinalized_period = Some(2); let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -3951,7 +3951,7 @@ fn import_versioned_approval() { let candidate_hash = polkadot_primitives::CandidateHash(Hash::repeat_byte(0xBB)); let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(0) }, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), state, |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -4124,7 +4124,8 @@ fn batch_test_round(message_count: usize) { let subsystem = ApprovalDistribution::new_with_clock( Default::default(), Default::default(), - Box::new(SystemClock {}), + Arc::new(SystemClock {}), + false, ); let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345); let mut sender = context.sender().clone(); @@ -4311,7 +4312,7 @@ fn subsystem_rejects_assignment_in_future() { let _ = test_harness( &MockAssignmentCriteria { tranche: Ok(89) }, - Box::new(DummyClock {}), + Arc::new(DummyClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; @@ -4377,7 +4378,7 @@ fn subsystem_rejects_bad_assignments() { &MockAssignmentCriteria { tranche: Err(InvalidAssignment(criteria::InvalidAssignmentReason::NullAssignment)), }, - Box::new(DummyClock {}), + Arc::new(DummyClock {}), state_without_reputation_delay(), |mut virtual_overseer| async move { let overseer = &mut virtual_overseer; diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index 84e935366d0cb..8053ad5a8d85a 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -45,8 +45,9 @@ use polkadot_node_subsystem::{ errors::SubsystemError, messages::{ network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, - BitfieldDistributionMessage, CollatorProtocolMessage, GossipSupportMessage, - NetworkBridgeEvent, NetworkBridgeRxMessage, StatementDistributionMessage, + ApprovalVotingParallelMessage, BitfieldDistributionMessage, CollatorProtocolMessage, + GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeRxMessage, + StatementDistributionMessage, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, }; @@ -89,6 +90,7 @@ pub struct NetworkBridgeRx { validation_service: Box, collation_service: Box, notification_sinks: Arc>>>, + approval_voting_parallel_enabled: bool, } impl NetworkBridgeRx { @@ -105,6 +107,7 @@ impl NetworkBridgeRx { peerset_protocol_names: PeerSetProtocolNames, mut notification_services: HashMap>, notification_sinks: Arc>>>, + approval_voting_parallel_enabled: bool, ) -> Self { let shared = Shared::default(); @@ -125,6 +128,7 @@ impl NetworkBridgeRx { validation_service, collation_service, notification_sinks, + approval_voting_parallel_enabled, } } } @@ -156,6 +160,7 @@ async fn handle_validation_message( peerset_protocol_names: &PeerSetProtocolNames, notification_service: &mut Box, notification_sinks: &mut Arc>>>, + approval_voting_parallel_enabled: bool, ) where AD: validator_discovery::AuthorityDiscovery + Send, { @@ -276,6 +281,7 @@ async fn handle_validation_message( ], sender, &metrics, + approval_voting_parallel_enabled, ) .await; @@ -329,6 +335,7 @@ async fn handle_validation_message( NetworkBridgeEvent::PeerDisconnected(peer), sender, &metrics, + approval_voting_parallel_enabled, ) .await; } @@ -398,7 +405,13 @@ async fn handle_validation_message( network_service.report_peer(peer, report.into()); } - dispatch_validation_events_to_all(events, sender, &metrics).await; + dispatch_validation_events_to_all( + events, + sender, + &metrics, + approval_voting_parallel_enabled, + ) + .await; }, } } @@ -652,6 +665,7 @@ async fn handle_network_messages( mut validation_service: Box, mut collation_service: Box, mut notification_sinks: Arc>>>, + approval_voting_parallel_enabled: bool, ) -> Result<(), Error> where AD: validator_discovery::AuthorityDiscovery + Send, @@ -669,6 +683,7 @@ where &peerset_protocol_names, &mut validation_service, &mut notification_sinks, + approval_voting_parallel_enabled, ).await, None => return Err(Error::EventStreamConcluded), }, @@ -727,6 +742,7 @@ async fn run_incoming_orchestra_signals( sync_oracle: Box, metrics: Metrics, notification_sinks: Arc>>>, + approval_voting_parallel_enabled: bool, ) -> Result<(), Error> where AD: validator_discovery::AuthorityDiscovery + Clone, @@ -766,6 +782,7 @@ where local_index, }), ctx.sender(), + approval_voting_parallel_enabled, ); }, FromOrchestra::Communication { @@ -787,6 +804,7 @@ where dispatch_validation_event_to_all_unbounded( NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids), ctx.sender(), + approval_voting_parallel_enabled, ); }, FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), @@ -826,6 +844,7 @@ where finalized_number, &metrics, ¬ification_sinks, + approval_voting_parallel_enabled, ); note_peers_count(&metrics, &shared); } @@ -875,6 +894,7 @@ where validation_service, collation_service, notification_sinks, + approval_voting_parallel_enabled, } = bridge; let (task, network_event_handler) = handle_network_messages( @@ -887,6 +907,7 @@ where validation_service, collation_service, notification_sinks.clone(), + approval_voting_parallel_enabled, ) .remote_handle(); @@ -900,6 +921,7 @@ where sync_oracle, metrics, notification_sinks, + approval_voting_parallel_enabled, ); futures::pin_mut!(orchestra_signal_handler); @@ -926,6 +948,7 @@ fn update_our_view( finalized_number: BlockNumber, metrics: &Metrics, notification_sinks: &Arc>>>, + approval_voting_parallel_enabled: bool, ) { let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number); @@ -1016,6 +1039,7 @@ fn update_our_view( dispatch_validation_event_to_all_unbounded( NetworkBridgeEvent::OurViewChange(our_view.clone()), ctx.sender(), + approval_voting_parallel_enabled, ); dispatch_collation_event_to_all_unbounded( @@ -1081,8 +1105,15 @@ async fn dispatch_validation_event_to_all( event: NetworkBridgeEvent, ctx: &mut impl overseer::NetworkBridgeRxSenderTrait, metrics: &Metrics, + approval_voting_parallel_enabled: bool, ) { - dispatch_validation_events_to_all(std::iter::once(event), ctx, metrics).await + dispatch_validation_events_to_all( + std::iter::once(event), + ctx, + metrics, + approval_voting_parallel_enabled, + ) + .await } async fn dispatch_collation_event_to_all( @@ -1095,6 +1126,7 @@ async fn dispatch_collation_event_to_all( fn dispatch_validation_event_to_all_unbounded( event: NetworkBridgeEvent, sender: &mut impl overseer::NetworkBridgeRxSenderTrait, + approval_voting_parallel_enabled: bool, ) { event .focus() @@ -1106,11 +1138,20 @@ fn dispatch_validation_event_to_all_unbounded( .ok() .map(BitfieldDistributionMessage::from) .and_then(|msg| Some(sender.send_unbounded_message(msg))); - event - .focus() - .ok() - .map(ApprovalDistributionMessage::from) - .and_then(|msg| Some(sender.send_unbounded_message(msg))); + + if approval_voting_parallel_enabled { + event + .focus() + .ok() + .map(ApprovalVotingParallelMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + } else { + event + .focus() + .ok() + .map(ApprovalDistributionMessage::from) + .and_then(|msg| Some(sender.send_unbounded_message(msg))); + } event .focus() .ok() @@ -1131,6 +1172,7 @@ async fn dispatch_validation_events_to_all( events: I, sender: &mut impl overseer::NetworkBridgeRxSenderTrait, _metrics: &Metrics, + approval_voting_parallel_enabled: bool, ) where I: IntoIterator>, I::IntoIter: Send, @@ -1140,7 +1182,13 @@ async fn dispatch_validation_events_to_all( .send_messages(event.focus().map(StatementDistributionMessage::from)) .await; sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; - sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + if approval_voting_parallel_enabled { + sender + .send_messages(event.focus().map(ApprovalVotingParallelMessage::from)) + .await; + } else { + sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; + } sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; } } diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index 6182bf3d883b5..e267446c2693a 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -521,6 +521,7 @@ fn test_harness>( validation_service, collation_service, notification_sinks, + approval_voting_parallel_enabled: false, }; let network_bridge = run_network_in(bridge, context) diff --git a/polkadot/node/overseer/src/dummy.rs b/polkadot/node/overseer/src/dummy.rs index fc5f0070773b7..6f9cd9d004032 100644 --- a/polkadot/node/overseer/src/dummy.rs +++ b/polkadot/node/overseer/src/dummy.rs @@ -88,6 +88,7 @@ pub fn dummy_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, SubsystemError, > @@ -131,6 +132,7 @@ pub fn one_for_all_overseer_builder( Sub, Sub, Sub, + Sub, >, SubsystemError, > @@ -155,6 +157,7 @@ where + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> + + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> @@ -183,6 +186,7 @@ where .statement_distribution(subsystem.clone()) .approval_distribution(subsystem.clone()) .approval_voting(subsystem.clone()) + .approval_voting_parallel(subsystem.clone()) .gossip_support(subsystem.clone()) .dispute_coordinator(subsystem.clone()) .dispute_distribution(subsystem.clone()) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 2c113f81c85fc..8d08f1a3aeb8e 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -76,13 +76,13 @@ use sc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotificat use self::messages::{BitfieldSigningMessage, PvfCheckerMessage}; use polkadot_node_subsystem_types::messages::{ - ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage, - AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage, - CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, - CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, - DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage, - NetworkBridgeTxMessage, ProspectiveParachainsMessage, ProvisionerMessage, RuntimeApiMessage, - StatementDistributionMessage, + ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage, + AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage, + BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage, + ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage, + DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage, + NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage, + ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, }; pub use polkadot_node_subsystem_types::{ @@ -549,6 +549,7 @@ pub struct Overseer { BitfieldDistributionMessage, StatementDistributionMessage, ApprovalDistributionMessage, + ApprovalVotingParallelMessage, GossipSupportMessage, DisputeDistributionMessage, CollationGenerationMessage, @@ -594,7 +595,19 @@ pub struct Overseer { RuntimeApiMessage, ])] approval_voting: ApprovalVoting, - + #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [ + AvailabilityRecoveryMessage, + CandidateValidationMessage, + ChainApiMessage, + ChainSelectionMessage, + DisputeCoordinatorMessage, + RuntimeApiMessage, + NetworkBridgeTxMessage, + ApprovalVotingMessage, + ApprovalDistributionMessage, + ApprovalVotingParallelMessage, + ])] + approval_voting_parallel: ApprovalVotingParallel, #[subsystem(GossipSupportMessage, sends: [ NetworkBridgeTxMessage, NetworkBridgeRxMessage, // TODO @@ -612,6 +625,7 @@ pub struct Overseer { AvailabilityStoreMessage, AvailabilityRecoveryMessage, ChainSelectionMessage, + ApprovalVotingParallelMessage, ])] dispute_coordinator: DisputeCoordinator, diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index 177e3addf368d..4cc2cda3a3ce3 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -1101,6 +1101,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (chain_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (pvf_checker_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (prospective_parachains_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (approval_voting_parallel_tx, _) = metered::channel(CHANNEL_CAPACITY); let (candidate_validation_unbounded_tx, _) = metered::unbounded(); let (candidate_backing_unbounded_tx, _) = metered::unbounded(); @@ -1125,6 +1126,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (chain_selection_unbounded_tx, _) = metered::unbounded(); let (pvf_checker_unbounded_tx, _) = metered::unbounded(); let (prospective_parachains_unbounded_tx, _) = metered::unbounded(); + let (approval_voting_parallel_unbounded_tx, _) = metered::unbounded(); let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), @@ -1150,6 +1152,7 @@ fn context_holds_onto_message_until_enough_signals_received() { chain_selection: chain_selection_bounded_tx.clone(), pvf_checker: pvf_checker_bounded_tx.clone(), prospective_parachains: prospective_parachains_bounded_tx.clone(), + approval_voting_parallel: approval_voting_parallel_tx.clone(), candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), @@ -1174,6 +1177,7 @@ fn context_holds_onto_message_until_enough_signals_received() { chain_selection_unbounded: chain_selection_unbounded_tx.clone(), pvf_checker_unbounded: pvf_checker_unbounded_tx.clone(), prospective_parachains_unbounded: prospective_parachains_unbounded_tx.clone(), + approval_voting_parallel_unbounded: approval_voting_parallel_unbounded_tx.clone(), }; let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs index ec41647fc3d14..d1874f7603649 100644 --- a/polkadot/node/primitives/src/approval.rs +++ b/polkadot/node/primitives/src/approval.rs @@ -18,7 +18,7 @@ /// A list of primitives introduced in v1. pub mod v1 { - use sp_consensus_babe as babe_primitives; + use sp_consensus_babe::{self as babe_primitives, SlotDuration}; pub use sp_consensus_babe::{ Randomness, Slot, VrfPreOutput, VrfProof, VrfSignature, VrfTranscript, }; @@ -118,7 +118,7 @@ pub mod v1 { } /// Metadata about a block which is now live in the approval protocol. - #[derive(Debug)] + #[derive(Debug, Clone)] pub struct BlockApprovalMeta { /// The hash of the block. pub hash: Hash, diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 23cd51d8a04c3..bceabd697fc11 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -128,6 +128,7 @@ polkadot-gossip-support = { optional = true, workspace = true, default-features polkadot-network-bridge = { optional = true, workspace = true, default-features = true } polkadot-node-collation-generation = { optional = true, workspace = true, default-features = true } polkadot-node-core-approval-voting = { optional = true, workspace = true, default-features = true } +polkadot-node-core-approval-voting-parallel = { optional = true, workspace = true, default-features = true } polkadot-node-core-av-store = { optional = true, workspace = true, default-features = true } polkadot-node-core-backing = { optional = true, workspace = true, default-features = true } polkadot-node-core-bitfield-signing = { optional = true, workspace = true, default-features = true } @@ -172,6 +173,7 @@ full-node = [ "polkadot-network-bridge", "polkadot-node-collation-generation", "polkadot-node-core-approval-voting", + "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-backing", "polkadot-node-core-bitfield-signing", diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index b76f40dd31029..1a174e0944770 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -668,6 +668,8 @@ pub struct NewFullParams { #[allow(dead_code)] pub malus_finality_delay: Option, pub hwbench: Option, + /// Enable approval voting processing in parallel. + pub enable_approval_voting_parallel: bool, } #[cfg(feature = "full-node")] @@ -761,6 +763,7 @@ pub fn new_full< execute_workers_max_num, prepare_workers_soft_max_num, prepare_workers_hard_max_num, + enable_approval_voting_parallel, }: NewFullParams, ) -> Result { use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD; @@ -816,6 +819,7 @@ pub fn new_full< overseer_handle.clone(), metrics, Some(basics.task_manager.spawn_handle()), + enable_approval_voting_parallel, ) } else { SelectRelayChain::new_longest_chain(basics.backend.clone()) @@ -1024,6 +1028,7 @@ pub fn new_full< dispute_coordinator_config, chain_selection_config, fetch_chunks_threshold, + enable_approval_voting_parallel, }) }; @@ -1543,6 +1548,7 @@ fn revert_approval_voting( Box::new(sp_consensus::NoNetwork), approval_voting_subsystem::Metrics::default(), Arc::new(SpawnGlue(task_handle)), + false, ); approval_voting diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 1a14b3f85cb19..5a4a5a3d65ed4 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -58,6 +58,9 @@ pub use polkadot_network_bridge::{ }; pub use polkadot_node_collation_generation::CollationGenerationSubsystem; pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; +pub use polkadot_node_core_approval_voting_parallel::{ + ApprovalVotingParallelSubsystem, Metrics as ApprovalVotingParallelMetrics, +}; pub use polkadot_node_core_av_store::AvailabilityStoreSubsystem; pub use polkadot_node_core_backing::CandidateBackingSubsystem; pub use polkadot_node_core_bitfield_signing::BitfieldSigningSubsystem; @@ -139,6 +142,7 @@ pub struct ExtendedOverseerGenArgs { /// than the value put in here we always try to recovery availability from backers. /// The presence of this parameter here is needed to have different values per chain. pub fetch_chunks_threshold: Option, + pub enable_approval_voting_parallel: bool, } /// Obtain a prepared validator `Overseer`, that is initialized with all default values. @@ -174,6 +178,7 @@ pub fn validator_overseer_builder( dispute_coordinator_config, chain_selection_config, fetch_chunks_threshold, + enable_approval_voting_parallel, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -203,6 +208,7 @@ pub fn validator_overseer_builder( CollatorProtocolSubsystem, ApprovalDistributionSubsystem, ApprovalVotingSubsystem, + ApprovalVotingParallelSubsystem, GossipSupportSubsystem, DisputeCoordinatorSubsystem, DisputeDistributionSubsystem, @@ -223,7 +229,8 @@ where let spawner = SpawnGlue(spawner); let network_bridge_metrics: NetworkBridgeMetrics = Metrics::register(registry)?; - + let approval_voting_parallel_metrics: ApprovalVotingParallelMetrics = + Metrics::register(registry)?; let builder = Overseer::builder() .network_bridge_tx(NetworkBridgeTxSubsystem::new( network_service.clone(), @@ -241,6 +248,7 @@ where peerset_protocol_names, notification_services, notification_sinks, + enable_approval_voting_parallel, )) .availability_distribution(AvailabilityDistributionSubsystem::new( keystore.clone(), @@ -309,16 +317,27 @@ where rand::rngs::StdRng::from_entropy(), )) .approval_distribution(ApprovalDistributionSubsystem::new( - Metrics::register(registry)?, + approval_voting_parallel_metrics.0.clone(), approval_voting_config.slot_duration_millis, + enable_approval_voting_parallel, )) .approval_voting(ApprovalVotingSubsystem::with_config( - approval_voting_config, + approval_voting_config.clone(), parachains_db.clone(), keystore.clone(), Box::new(sync_service.clone()), - Metrics::register(registry)?, + approval_voting_parallel_metrics.1.clone(), Arc::new(spawner.clone()), + enable_approval_voting_parallel, + )) + .approval_voting_parallel(ApprovalVotingParallelSubsystem::with_config( + approval_voting_config, + parachains_db.clone(), + keystore.clone(), + Box::new(sync_service.clone()), + approval_voting_parallel_metrics, + spawner.clone(), + enable_approval_voting_parallel, )) .gossip_support(GossipSupportSubsystem::new( keystore.clone(), @@ -330,6 +349,7 @@ where dispute_coordinator_config, keystore.clone(), Metrics::register(registry)?, + enable_approval_voting_parallel, )) .dispute_distribution(DisputeDistributionSubsystem::new( keystore.clone(), @@ -405,6 +425,7 @@ pub fn collator_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, Error, > @@ -437,6 +458,7 @@ where peerset_protocol_names, notification_services, notification_sinks, + false, )) .availability_distribution(DummySubsystem) .availability_recovery(AvailabilityRecoverySubsystem::for_collator( @@ -479,6 +501,7 @@ where .statement_distribution(DummySubsystem) .approval_distribution(DummySubsystem) .approval_voting(DummySubsystem) + .approval_voting_parallel(DummySubsystem) .gossip_support(DummySubsystem) .dispute_coordinator(DummySubsystem) .dispute_distribution(DummySubsystem) diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index c0b1ce8b0ebe1..e48874f01ca6f 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -39,8 +39,8 @@ use super::{HeaderProvider, HeaderProviderProvider}; use futures::channel::oneshot; use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG; use polkadot_node_subsystem::messages::{ - ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, - DisputeCoordinatorMessage, HighestApprovedAncestorBlock, + ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage, + ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; @@ -169,6 +169,7 @@ where overseer: Handle, metrics: Metrics, spawn_handle: Option, + approval_voting_parallel_enabled: bool, ) -> Self { gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",); @@ -179,6 +180,7 @@ where overseer, metrics, spawn_handle, + approval_voting_parallel_enabled, )), } } @@ -230,6 +232,7 @@ pub struct SelectRelayChainInner { overseer: OH, metrics: Metrics, spawn_handle: Option, + approval_voting_parallel_enabled: bool, } impl SelectRelayChainInner @@ -244,8 +247,15 @@ where overseer: OH, metrics: Metrics, spawn_handle: Option, + approval_voting_parallel_enabled: bool, ) -> Self { - SelectRelayChainInner { backend, overseer, metrics, spawn_handle } + SelectRelayChainInner { + backend, + overseer, + metrics, + spawn_handle, + approval_voting_parallel_enabled, + } } fn block_header(&self, hash: Hash) -> Result { @@ -284,6 +294,7 @@ where overseer: self.overseer.clone(), metrics: self.metrics.clone(), spawn_handle: self.spawn_handle.clone(), + approval_voting_parallel_enabled: self.approval_voting_parallel_enabled, } } } @@ -448,13 +459,25 @@ where // 2. Constrain according to `ApprovedAncestor`. let (subchain_head, subchain_number, subchain_block_descriptions) = { let (tx, rx) = oneshot::channel(); - overseer - .send_msg( - ApprovalVotingMessage::ApprovedAncestor(subchain_head, target_number, tx), - std::any::type_name::(), - ) - .await; - + if self.approval_voting_parallel_enabled { + overseer + .send_msg( + ApprovalVotingParallelMessage::ApprovedAncestor( + subchain_head, + target_number, + tx, + ), + std::any::type_name::(), + ) + .await; + } else { + overseer + .send_msg( + ApprovalVotingMessage::ApprovedAncestor(subchain_head, target_number, tx), + std::any::type_name::(), + ) + .await; + } match rx .await .map_err(Error::ApprovedAncestorCanceled) @@ -476,13 +499,23 @@ where // task for sending the message to not block here and delay finality. if let Some(spawn_handle) = &self.spawn_handle { let mut overseer_handle = self.overseer.clone(); + let approval_voting_parallel_enabled = self.approval_voting_parallel_enabled; let lag_update_task = async move { - overseer_handle - .send_msg( - ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), - std::any::type_name::(), - ) - .await; + if approval_voting_parallel_enabled { + overseer_handle + .send_msg( + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + } else { + overseer_handle + .send_msg( + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; + } }; spawn_handle.spawn( diff --git a/polkadot/node/service/src/tests.rs b/polkadot/node/service/src/tests.rs index bebd050710135..0b438fd6c1d56 100644 --- a/polkadot/node/service/src/tests.rs +++ b/polkadot/node/service/src/tests.rs @@ -86,6 +86,7 @@ fn test_harness>( context.sender().clone(), Default::default(), None, + false, ); let target_hash = case_vars.target_block; diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 0325613d25f9a..e429a6fe982aa 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -76,6 +76,7 @@ serde_yaml = { workspace = true } serde_json = { workspace = true } polkadot-node-core-approval-voting = { workspace = true, default-features = true } +polkadot-node-core-approval-voting-parallel = { workspace = true, default-features = true } polkadot-approval-distribution = { workspace = true, default-features = true } sp-consensus-babe = { workspace = true, default-features = true } sp-runtime = { workspace = true } diff --git a/polkadot/node/subsystem-bench/src/lib/approval/helpers.rs b/polkadot/node/subsystem-bench/src/lib/approval/helpers.rs index ca58875c81393..f9eaacfeb1482 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/helpers.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/helpers.rs @@ -21,8 +21,9 @@ use polkadot_node_network_protocol::{ grid_topology::{SessionGridTopology, TopologyPeerInfo}, View, }; +use polkadot_node_subsystem::messages::ApprovalVotingParallelMessage; use polkadot_node_subsystem_types::messages::{ - network_bridge_event::NewGossipTopology, ApprovalDistributionMessage, NetworkBridgeEvent, + network_bridge_event::NewGossipTopology, NetworkBridgeEvent, }; use polkadot_overseer::AllMessages; use polkadot_primitives::{ @@ -129,14 +130,16 @@ pub fn generate_new_session_topology( topology, local_index: Some(test_node), }); - vec![AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(event))] + vec![AllMessages::ApprovalVotingParallel(ApprovalVotingParallelMessage::NetworkBridgeUpdate( + event, + ))] } /// Generates a peer view change for the passed `block_hash` pub fn generate_peer_view_change_for(block_hash: Hash, peer_id: PeerId) -> AllMessages { let network = NetworkBridgeEvent::PeerViewChange(peer_id, View::new([block_hash], 0)); - AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(network)) + AllMessages::ApprovalVotingParallel(ApprovalVotingParallelMessage::NetworkBridgeUpdate(network)) } /// Helper function to create a a signature for the block header. diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index 80861d850be7c..34614a8e3ea6e 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -13,7 +13,6 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . - use crate::{ approval::{ helpers::{ @@ -49,14 +48,15 @@ use overseer::{metrics::Metrics as OverseerMetrics, MetricsTrait}; use polkadot_approval_distribution::ApprovalDistribution; use polkadot_node_core_approval_voting::{ time::{slot_number_to_tick, tick_to_slot_number, Clock, ClockExt, SystemClock}, - ApprovalVotingSubsystem, Config as ApprovalVotingConfig, Metrics as ApprovalVotingMetrics, + ApprovalVotingSubsystem, Config as ApprovalVotingConfig, }; use polkadot_node_network_protocol::v3 as protocol_v3; use polkadot_node_primitives::approval::{self, v1::RelayVRFStory}; -use polkadot_node_subsystem::{overseer, AllMessages, Overseer, OverseerConnector, SpawnGlue}; +use polkadot_node_subsystem::{ + messages::ApprovalVotingParallelMessage, overseer, AllMessages, Overseer, OverseerConnector, + SpawnGlue, +}; use polkadot_node_subsystem_test_helpers::mock::new_block_import_info; -use polkadot_node_subsystem_types::messages::{ApprovalDistributionMessage, ApprovalVotingMessage}; -use polkadot_node_subsystem_util::metrics::Metrics; use polkadot_overseer::Handle as OverseerHandleReal; use polkadot_primitives::{ BlockNumber, CandidateEvent, CandidateIndex, CandidateReceipt, Hash, Header, Slot, @@ -265,7 +265,7 @@ pub struct ApprovalTestState { /// Total unique sent messages. total_unique_messages: Arc, /// Approval voting metrics. - approval_voting_metrics: ApprovalVotingMetrics, + approval_voting_metrics_rewrite: polkadot_node_core_approval_voting_parallel::Metrics, /// The delta ticks from the tick the messages were generated to the the time we start this /// message. delta_tick_from_generated: Arc, @@ -323,7 +323,10 @@ impl ApprovalTestState { total_sent_messages_from_node: Arc::new(AtomicU64::new(0)), total_unique_messages: Arc::new(AtomicU64::new(0)), options, - approval_voting_metrics: ApprovalVotingMetrics::try_register(&dependencies.registry) + approval_voting_metrics_rewrite: + polkadot_node_core_approval_voting_parallel::Metrics::try_register( + &dependencies.registry, + ) .unwrap(), delta_tick_from_generated: Arc::new(AtomicU64::new(630720000)), configuration: configuration.clone(), @@ -590,9 +593,9 @@ impl PeerMessageProducer { // so when the approval-distribution answered to it, we know it doesn't have anything // else to process. let (tx, rx) = oneshot::channel(); - let msg = ApprovalDistributionMessage::GetApprovalSignatures(HashSet::new(), tx); + let msg = ApprovalVotingParallelMessage::GetApprovalSignatures(HashSet::new(), tx); self.send_overseer_message( - AllMessages::ApprovalDistribution(msg), + AllMessages::ApprovalVotingParallel(msg), ValidatorIndex(0), None, ) @@ -795,21 +798,38 @@ fn build_overseer( let system_clock = PastSystemClock::new(SystemClock {}, state.delta_tick_from_generated.clone()); + let keystore = Arc::new(keystore); + let db = Arc::new(db); let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( TEST_CONFIG, - Arc::new(db), - Arc::new(keystore), + db.clone(), + keystore.clone(), Box::new(TestSyncOracle {}), - state.approval_voting_metrics.clone(), + state.approval_voting_metrics_rewrite.1.clone(), Arc::new(system_clock.clone()), Arc::new(SpawnGlue(spawn_task_handle.clone())), + true, ); let approval_distribution = ApprovalDistribution::new_with_clock( - Metrics::register(Some(&dependencies.registry)).unwrap(), - SLOT_DURATION_MILLIS, - Box::new(system_clock.clone()), + state.approval_voting_metrics_rewrite.0.clone(), + TEST_CONFIG.slot_duration_millis as u64, + Arc::new(system_clock.clone()), + true, ); + + let approval_voting_parallel = + polkadot_node_core_approval_voting_parallel::ApprovalVotingParallelSubsystem::with_config_and_clock( + TEST_CONFIG, + db.clone(), + keystore.clone(), + Box::new(TestSyncOracle {}), + state.approval_voting_metrics_rewrite.clone(), + Arc::new(system_clock.clone()), + SpawnGlue(spawn_task_handle.clone()), + true, + ); + let mock_chain_api = MockChainApi::new(state.build_chain_api_state()); let mock_chain_selection = MockChainSelection { state: state.clone(), clock: system_clock }; let mock_runtime_api = MockRuntimeApi::new( @@ -831,6 +851,7 @@ fn build_overseer( let dummy = dummy_builder!(spawn_task_handle, overseer_metrics) .replace_approval_distribution(|_| approval_distribution) .replace_approval_voting(|_| approval_voting) + .replace_approval_voting_parallel(|_| approval_voting_parallel) .replace_chain_api(|_| mock_chain_api) .replace_chain_selection(|_| mock_chain_selection) .replace_runtime_api(|_| mock_runtime_api) @@ -927,7 +948,7 @@ pub async fn bench_approvals_run( // First create the initialization messages that make sure that then node under // tests receives notifications about the topology used and the connected peers. let mut initialization_messages = env.network().generate_peer_connected(|e| { - AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NetworkBridgeUpdate(e)) + AllMessages::ApprovalVotingParallel(ApprovalVotingParallelMessage::NetworkBridgeUpdate(e)) }); initialization_messages.extend(generate_new_session_topology( &state.test_authorities, @@ -996,7 +1017,7 @@ pub async fn bench_approvals_run( state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize; env.wait_until_metric( "polkadot_parachain_subsystem_bounded_received", - Some(("subsystem_name", "approval-distribution-subsystem")), + Some(("subsystem_name", "approval-voting-parallel-subsystem")), |value| { gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric"); value >= at_least_messages as f64 @@ -1012,11 +1033,11 @@ pub async fn bench_approvals_run( CandidateEvent::CandidateIncluded(receipt_fetch, _head, _, _) => { let (tx, rx) = oneshot::channel(); - let msg = ApprovalVotingMessage::GetApprovalSignaturesForCandidate( + let msg = ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate( receipt_fetch.hash(), tx, ); - env.send_message(AllMessages::ApprovalVoting(msg)).await; + env.send_message(AllMessages::ApprovalVotingParallel(msg)).await; let result = rx.await.unwrap(); @@ -1040,7 +1061,7 @@ pub async fn bench_approvals_run( state.total_sent_messages_to_node.load(std::sync::atomic::Ordering::SeqCst) as usize; env.wait_until_metric( "polkadot_parachain_subsystem_bounded_received", - Some(("subsystem_name", "approval-distribution-subsystem")), + Some(("subsystem_name", "approval-voting-parallel-subsystem")), |value| { gum::info!(target: LOG_TARGET, ?value, ?at_least_messages, "Waiting metric"); value >= at_least_messages as f64 @@ -1081,5 +1102,9 @@ pub async fn bench_approvals_run( state.total_unique_messages.load(std::sync::atomic::Ordering::SeqCst) ); - env.collect_resource_usage(&["approval-distribution", "approval-voting"]) + env.collect_resource_usage(&[ + "approval-distribution", + "approval-voting", + "approval-voting-parallel", + ]) } diff --git a/polkadot/node/subsystem-bench/src/lib/environment.rs b/polkadot/node/subsystem-bench/src/lib/environment.rs index a63f90da50b3a..7e3c7f42b6131 100644 --- a/polkadot/node/subsystem-bench/src/lib/environment.rs +++ b/polkadot/node/subsystem-bench/src/lib/environment.rs @@ -24,6 +24,7 @@ use crate::{ }; use core::time::Duration; use futures::{Future, FutureExt}; +use polkadot_node_core_approval_voting_parallel::APPROVAL_DISTRIBUTION_WORKER_COUNT; use polkadot_node_subsystem::{messages::AllMessages, Overseer, SpawnGlue, TimeoutExt}; use polkadot_node_subsystem_types::Hash; use polkadot_node_subsystem_util::metrics::prometheus::{ @@ -392,6 +393,38 @@ impl TestEnvironment { total: total_cpu, per_block: total_cpu / num_blocks, }); + + if subsystem == &"approval-voting-parallel" { + for i in 0..APPROVAL_DISTRIBUTION_WORKER_COUNT { + let task_name = format!("approval-voting-parallel-{}", i); + + let subsystem_cpu_metrics = + test_metrics.subset_with_label_value("task_name", task_name.as_str()); + + let total_cpu = + subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum"); + + usage.push(ResourceUsage { + resource_name: task_name.to_string(), + total: total_cpu, + per_block: total_cpu / num_blocks, + }) + } + + let task_name = format!("approval-voting-parallel-db"); + + let subsystem_cpu_metrics = + test_metrics.subset_with_label_value("task_name", task_name.as_str()); + + let total_cpu = + subsystem_cpu_metrics.sum_by("substrate_tasks_polling_duration_sum"); + + usage.push(ResourceUsage { + resource_name: task_name.to_string(), + total: total_cpu, + per_block: total_cpu / num_blocks, + }) + } } let test_env_cpu_metrics = diff --git a/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs b/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs index 8783b35f1c04a..092a8fc5f4c12 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs @@ -96,5 +96,6 @@ mock!(NetworkBridgeTx); mock!(ChainApi); mock!(ChainSelection); mock!(ApprovalVoting); +mock!(ApprovalVotingParallel); mock!(ApprovalDistribution); mock!(RuntimeApi); diff --git a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs index 12766374bfa9f..3e33c4dde51a6 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs @@ -45,6 +45,7 @@ macro_rules! dummy_builder { // All subsystem except approval_voting and approval_distribution are mock subsystems. Overseer::builder() .approval_voting(MockApprovalVoting {}) + .approval_voting_parallel(MockApprovalVotingParallel {}) .approval_distribution(MockApprovalDistribution {}) .availability_recovery(MockAvailabilityRecovery {}) .candidate_validation(MockCandidateValidation {}) diff --git a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs index d70953926d130..a32b48705463a 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/network_bridge.rs @@ -24,7 +24,8 @@ use crate::{ use futures::{channel::mpsc::UnboundedSender, FutureExt, StreamExt}; use polkadot_node_network_protocol::Versioned; use polkadot_node_subsystem::{ - messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem, SubsystemError, + messages::{ApprovalVotingParallelMessage, NetworkBridgeTxMessage}, + overseer, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_types::{ messages::{ @@ -200,7 +201,7 @@ impl MockNetworkBridgeRx { polkadot_node_network_protocol::v3::ValidationProtocol::ApprovalDistribution(msg) ) => { ctx.send_message( - ApprovalDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg))) + ApprovalVotingParallelMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(peer_id, polkadot_node_network_protocol::Versioned::V3(msg))) ).await; } Versioned::V3( diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 4233c35c62cdc..fdcb103c0941e 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -945,6 +945,118 @@ pub struct BlockDescription { pub candidates: Vec, } +/// Message to the Approval Voting subsystem running both approval-distribution and approval-voting +/// logic in parallel. This is a combination of all the messages ApprovalVoting and +/// ApprovalDistribution subsystems can receive. +#[derive(Debug, derive_more::From)] +pub enum ApprovalVotingParallelMessage { + /// Check if the assignment is valid and can be accepted by our view of the protocol. + /// Should not be sent unless the block hash is known. + CheckAndImportAssignment(IndirectAssignmentCertV2, CandidateBitfield, DelayTranche), + /// Check if the approval vote is valid and can be accepted by our view of the + /// protocol. + /// + /// Should not be sent unless the block hash within the indirect vote is known. + CheckAndImportApproval(IndirectSignedApprovalVoteV2), + /// Returns the highest possible ancestor hash of the provided block hash which is + /// acceptable to vote on finality for. + /// The `BlockNumber` provided is the number of the block's ancestor which is the + /// earliest possible vote. + /// + /// It can also return the same block hash, if that is acceptable to vote upon. + /// Return `None` if the input hash is unrecognized. + ApprovedAncestor(Hash, BlockNumber, oneshot::Sender>), + + /// Retrieve all available approval signatures for a candidate from approval-voting. + /// + /// This message involves a linear search for candidates on each relay chain fork and also + /// requires calling into `approval-distribution`: Calls should be infrequent and bounded. + GetApprovalSignaturesForCandidate( + CandidateHash, + oneshot::Sender, ValidatorSignature)>>, + ), + /// Notify the `ApprovalDistribution` subsystem about new blocks + /// and the candidates contained within them. + NewBlocks(Vec), + /// Distribute an assignment cert from the local validator. The cert is assumed + /// to be valid, relevant, and for the given relay-parent and validator index. + DistributeAssignment(IndirectAssignmentCertV2, CandidateBitfield), + /// Distribute an approval vote for the local validator. The approval vote is assumed to be + /// valid, relevant, and the corresponding approval already issued. + /// If not, the subsystem is free to drop the message. + DistributeApproval(IndirectSignedApprovalVoteV2), + /// An update from the network bridge. + #[from] + NetworkBridgeUpdate(NetworkBridgeEvent), + + /// Get all approval signatures for all chains a candidate appeared in. + GetApprovalSignatures( + HashSet<(Hash, CandidateIndex)>, + oneshot::Sender, ValidatorSignature)>>, + ), + /// Approval checking lag update measured in blocks. + ApprovalCheckingLagUpdate(BlockNumber), +} + +impl TryFrom for ApprovalVotingMessage { + type Error = (); + + fn try_from(msg: ApprovalVotingParallelMessage) -> Result { + match msg { + ApprovalVotingParallelMessage::CheckAndImportAssignment(cert, bitfield, tranche) => + Ok(ApprovalVotingMessage::CheckAndImportAssignment(cert, bitfield, tranche, None)), + ApprovalVotingParallelMessage::CheckAndImportApproval(vote) => + Ok(ApprovalVotingMessage::CheckAndImportApproval(vote, None)), + ApprovalVotingParallelMessage::ApprovedAncestor(hash, number, tx) => + Ok(ApprovalVotingMessage::ApprovedAncestor(hash, number, tx)), + ApprovalVotingParallelMessage::GetApprovalSignaturesForCandidate(candidate, tx) => + Ok(ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate, tx)), + _ => Err(()), + } + } +} + +impl TryFrom for ApprovalDistributionMessage { + type Error = (); + + fn try_from(msg: ApprovalVotingParallelMessage) -> Result { + match msg { + ApprovalVotingParallelMessage::NewBlocks(blocks) => + Ok(ApprovalDistributionMessage::NewBlocks(blocks)), + ApprovalVotingParallelMessage::DistributeAssignment(assignment, claimed_cores) => + Ok(ApprovalDistributionMessage::DistributeAssignment(assignment, claimed_cores)), + ApprovalVotingParallelMessage::DistributeApproval(vote) => + Ok(ApprovalDistributionMessage::DistributeApproval(vote)), + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg) => + Ok(ApprovalDistributionMessage::NetworkBridgeUpdate(msg)), + ApprovalVotingParallelMessage::GetApprovalSignatures(candidate_indicies, tx) => + Ok(ApprovalDistributionMessage::GetApprovalSignatures(candidate_indicies, tx)), + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag) => + Ok(ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag)), + _ => Err(()), + } + } +} + +impl From for ApprovalVotingParallelMessage { + fn from(msg: ApprovalDistributionMessage) -> Self { + match msg { + ApprovalDistributionMessage::NewBlocks(blocks) => + ApprovalVotingParallelMessage::NewBlocks(blocks), + ApprovalDistributionMessage::DistributeAssignment(cert, bitfield) => + ApprovalVotingParallelMessage::DistributeAssignment(cert, bitfield), + ApprovalDistributionMessage::DistributeApproval(vote) => + ApprovalVotingParallelMessage::DistributeApproval(vote), + ApprovalDistributionMessage::NetworkBridgeUpdate(msg) => + ApprovalVotingParallelMessage::NetworkBridgeUpdate(msg), + ApprovalDistributionMessage::GetApprovalSignatures(candidate_indicies, tx) => + ApprovalVotingParallelMessage::GetApprovalSignatures(candidate_indicies, tx), + ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => + ApprovalVotingParallelMessage::ApprovalCheckingLagUpdate(lag), + } + } +} + /// Response type to `ApprovalVotingMessage::ApprovedAncestor`. #[derive(Clone, Debug)] pub struct HighestApprovedAncestorBlock { diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index 35156a3a9372b..ed5338fb27a3a 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -100,6 +100,7 @@ pub fn new_full( execute_workers_max_num: None, prepare_workers_hard_max_num: None, prepare_workers_soft_max_num: None, + enable_approval_voting_parallel: false, }, ), sc_network::config::NetworkBackendType::Litep2p => @@ -122,6 +123,7 @@ pub fn new_full( execute_workers_max_num: None, prepare_workers_hard_max_num: None, prepare_workers_soft_max_num: None, + enable_approval_voting_parallel: false, }, ), } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index e8588274df27a..4660b4d38f7c2 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -98,6 +98,7 @@ fn main() -> Result<()> { execute_workers_max_num: None, prepare_workers_hard_max_num: None, prepare_workers_soft_max_num: None, + enable_approval_voting_parallel: false, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 7198a831a4771..3dfa714e6d16c 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -100,6 +100,7 @@ fn main() -> Result<()> { execute_workers_max_num: None, prepare_workers_hard_max_num: None, prepare_workers_soft_max_num: None, + enable_approval_voting_parallel: false, }, ) .map_err(|e| e.to_string())?;