From e30d3af097665c1ef38241ceee9faa760f66a890 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 5 Jun 2026 16:26:29 +0700 Subject: [PATCH 1/2] feat(consensus): implement consensus wrapper --- crates/consensus/src/lib.rs | 3 + crates/consensus/src/wrapper.rs | 232 ++++++++++++++++++++++++++++++++ 2 files changed, 235 insertions(+) create mode 100644 crates/consensus/src/wrapper.rs diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 2f2d0dfc..a7673d65 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -17,3 +17,6 @@ pub mod qbft; /// Consensus round timers. pub mod timer; + +/// Swappable consensus implementation wrapper. +pub mod wrapper; diff --git a/crates/consensus/src/wrapper.rs b/crates/consensus/src/wrapper.rs new file mode 100644 index 00000000..8c5c3ad0 --- /dev/null +++ b/crates/consensus/src/wrapper.rs @@ -0,0 +1,232 @@ +//! Swappable consensus implementation wrapper. + +use std::{ + error::Error as StdError, + sync::{Arc, PoisonError, RwLock}, +}; + +use futures::future::BoxFuture; +use pluto_core::{corepb::v1::core as pbcore, types::Duty}; +use tokio_util::sync::CancellationToken; + +/// Consensus wrapper result. +pub type Result = std::result::Result>; + +/// Subscriber callback result. +pub type SubscriberResult = Result<()>; + +/// Subscriber callback for decided unsigned duty data. +pub type Subscriber = + Box SubscriberResult + Send + Sync + 'static>; + +/// Consensus implementation interface. +pub trait Consensus: Send + Sync { + /// Returns the consensus protocol ID. + fn protocol_id(&self) -> String; + + /// Starts the consensus implementation. + fn start(&self, ct: CancellationToken); + + /// Starts participating in a consensus instance. + fn participate(&self, ct: CancellationToken, duty: Duty) -> BoxFuture<'static, Result<()>>; + + /// Proposes unsigned duty data for a consensus instance. + fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'static, Result<()>>; + + /// Registers a callback for decided unsigned duty data. + fn subscribe(&self, subscriber: Subscriber); +} + +/// Wrapper that forwards calls to the current consensus implementation. +pub struct ConsensusWrapper { + implementation: RwLock>, +} + +impl ConsensusWrapper { + /// Wraps a consensus implementation. + pub fn new(implementation: Arc) -> Self { + Self { + implementation: RwLock::new(implementation), + } + } + + /// Sets the current consensus implementation. + pub fn set_impl(&self, implementation: Arc) { + *self + .implementation + .write() + .unwrap_or_else(PoisonError::into_inner) = implementation; + } + + /// Returns the current consensus protocol ID. + pub fn protocol_id(&self) -> String { + self.current().protocol_id() + } + + /// Starts the current consensus implementation. + pub fn start(&self, ct: CancellationToken) { + self.current().start(ct); + } + + /// Starts participating in a consensus instance. + pub async fn participate(&self, ct: CancellationToken, duty: Duty) -> Result<()> { + self.current().participate(ct, duty).await + } + + /// Proposes unsigned duty data for a consensus instance. + pub async fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> Result<()> { + self.current().propose(ct, duty, value).await + } + + /// Registers a callback for decided unsigned duty data. + pub fn subscribe(&self, subscriber: Subscriber) { + self.current().subscribe(subscriber); + } + + fn current(&self) -> Arc { + self.implementation + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use futures::FutureExt as _; + use pluto_core::{ + corepb::v1::core as pbcore, + types::{Duty, SlotNumber}, + }; + + use crate::protocols::QBFT_V2_PROTOCOL_ID; + + use super::*; + + #[tokio::test] + async fn new_consensus_wrapper_forwards_to_current_impl() { + let ct = CancellationToken::new(); + let duty = Duty::new_randao_duty(SlotNumber::new(123)); + let value = pbcore::UnsignedDataSet::default(); + let first = Arc::new(TestConsensus::new(QBFT_V2_PROTOCOL_ID)); + let wrapper = ConsensusWrapper::new(first.clone()); + + assert_eq!(wrapper.protocol_id(), QBFT_V2_PROTOCOL_ID); + + wrapper + .participate(ct.clone(), duty.clone()) + .await + .expect("participate forwards"); + wrapper + .propose(ct.clone(), duty.clone(), value) + .await + .expect("propose forwards"); + + let subscribed = Arc::new(Mutex::new(Vec::new())); + let subscribed_clone = Arc::clone(&subscribed); + wrapper.subscribe(Box::new(move |duty, _| { + subscribed_clone + .lock() + .unwrap_or_else(PoisonError::into_inner) + .push(duty); + Ok(()) + })); + + wrapper.start(ct); + + assert_eq!( + first.calls(), + vec!["participate", "propose", "subscribe", "start"] + ); + assert_eq!( + subscribed + .lock() + .unwrap_or_else(PoisonError::into_inner) + .as_slice(), + &[duty] + ); + + let second = Arc::new(TestConsensus::new("foobar")); + wrapper.set_impl(second); + + assert_eq!(wrapper.protocol_id(), "foobar"); + } + + struct TestConsensus { + protocol_id: String, + calls: Mutex>, + } + + impl TestConsensus { + fn new(protocol_id: &str) -> Self { + Self { + protocol_id: protocol_id.to_string(), + calls: Mutex::default(), + } + } + + fn calls(&self) -> Vec<&'static str> { + self.calls + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone() + } + + fn record(&self, call: &'static str) { + self.calls + .lock() + .unwrap_or_else(PoisonError::into_inner) + .push(call); + } + } + + impl Consensus for TestConsensus { + fn protocol_id(&self) -> String { + self.protocol_id.clone() + } + + fn start(&self, _ct: CancellationToken) { + self.record("start"); + } + + fn participate( + &self, + _ct: CancellationToken, + _duty: Duty, + ) -> BoxFuture<'static, Result<()>> { + self.record("participate"); + async { Ok(()) }.boxed() + } + + fn propose( + &self, + _ct: CancellationToken, + _duty: Duty, + _value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'static, Result<()>> { + self.record("propose"); + async { Ok(()) }.boxed() + } + + fn subscribe(&self, subscriber: Subscriber) { + self.record("subscribe"); + subscriber( + Duty::new_randao_duty(SlotNumber::new(123)), + pbcore::UnsignedDataSet::default(), + ) + .expect("test subscriber succeeds"); + } + } +} From 264d0cae8904abb5884b0a4ea5e95c0ffeef347f Mon Sep 17 00:00:00 2001 From: Quang Le Date: Mon, 8 Jun 2026 10:04:50 +0700 Subject: [PATCH 2/2] feat(consensus): implement consensus controller --- crates/consensus/examples/qbft.rs | 3 +- crates/consensus/src/controller.rs | 191 +++++++++++++++++++++ crates/consensus/src/lib.rs | 3 + crates/consensus/src/qbft/component.rs | 88 ++++++++-- crates/consensus/src/qbft/qbft_run_test.rs | 30 +--- crates/consensus/src/wrapper.rs | 12 +- 6 files changed, 281 insertions(+), 46 deletions(-) create mode 100644 crates/consensus/src/controller.rs diff --git a/crates/consensus/examples/qbft.rs b/crates/consensus/examples/qbft.rs index 53255abe..69d340fb 100644 --- a/crates/consensus/examples/qbft.rs +++ b/crates/consensus/examples/qbft.rs @@ -598,6 +598,7 @@ fn build_consensus( local_peer_idx: i64::try_from(fixture.local_index)?, privkey: fixture.key.clone(), deadliner, + expired_rx, duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester), broadcaster, sniffer: Arc::new(move |instance| { @@ -618,7 +619,7 @@ fn build_consensus( }); Ok(()) }); - let lifecycle_task = Arc::clone(&component).start(cancel.child_token(), expired_rx); + let lifecycle_task = component.start(cancel.child_token()); Ok(ConsensusRuntime { component, diff --git a/crates/consensus/src/controller.rs b/crates/consensus/src/controller.rs new file mode 100644 index 00000000..c40a8998 --- /dev/null +++ b/crates/consensus/src/controller.rs @@ -0,0 +1,191 @@ +//! Consensus protocol controller. + +use std::sync::Arc; + +use k256::SecretKey; +use pluto_core::{deadline::DeadlinerHandle, types::Duty}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use crate::{ + debugger::Debugger, + qbft, + timer::RoundTimerFunc, + wrapper::{Consensus, ConsensusWrapper}, +}; + +/// Consensus controller result. +pub type Result = std::result::Result; + +/// Consensus controller error. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to construct the default QBFT consensus implementation. + #[error("{0}")] + Qbft(#[from] qbft::Error), + /// Protocol ID is not supported by this controller. + #[error("unsupported protocol id")] + UnsupportedProtocolId, +} + +/// Consensus controller constructor config. +pub struct Config { + /// Consensus peers in process-index order. + pub peers: Vec, + /// Local zero-based process index. + pub local_peer_idx: i64, + /// Local secp256k1 private key. + pub privkey: SecretKey, + /// Duty deadline scheduler. + pub deadliner: DeadlinerHandle, + /// Expired-duty receiver paired with `deadliner`. + pub expired_rx: mpsc::Receiver, + /// Duty admission gate. + pub duty_gater: qbft::DutyGater, + /// External message broadcaster. + pub broadcaster: qbft::Broadcaster, + /// Consensus debugger. + pub debugger: Debugger, + /// Enables attestation value comparison. + pub compare_attestations: bool, + /// Round timer factory. + pub timer_func: RoundTimerFunc, +} + +/// Controls the active consensus protocol implementation. +pub struct ConsensusController { + default_consensus: Arc, + wrapped_consensus: ConsensusWrapper, +} + +impl ConsensusController { + /// Creates a new consensus controller with QBFT as the default protocol. + pub fn new(config: Config) -> Result { + let qbft = Arc::new(qbft::Consensus::new(qbft::Config { + peers: config.peers, + local_peer_idx: config.local_peer_idx, + privkey: config.privkey, + deadliner: config.deadliner, + expired_rx: config.expired_rx, + duty_gater: config.duty_gater, + broadcaster: config.broadcaster, + sniffer: config.debugger.sniffer(), + compare_attestations: config.compare_attestations, + timer_func: config.timer_func, + })?); + let default_consensus: Arc = qbft; + + Ok(Self { + wrapped_consensus: ConsensusWrapper::new(default_consensus.clone()), + default_consensus, + }) + } + + /// Starts the default consensus implementation. + pub fn start(&self, ct: CancellationToken) { + self.default_consensus.start(ct); + } + + /// Returns the default consensus implementation. + pub fn default_consensus(&self) -> Arc { + Arc::clone(&self.default_consensus) + } + + /// Returns the current consensus wrapper. + pub fn current_consensus(&self) -> &ConsensusWrapper { + &self.wrapped_consensus + } + + /// Sets the current consensus implementation for `protocol`. + pub fn set_current_consensus_for_protocol(&self, protocol: &str) -> Result<()> { + if self.wrapped_consensus.protocol_id() == protocol { + return Ok(()); + } + + if self.default_consensus.protocol_id() == protocol { + self.wrapped_consensus + .set_impl(Arc::clone(&self.default_consensus)); + return Ok(()); + } + + Err(Error::UnsupportedProtocolId) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use k256::SecretKey; + use pluto_core::{ + deadline::{DeadlinerTask, NeverExpiringCalculator}, + types::DutyType, + }; + + use crate::{debugger::Debugger, protocols::QBFT_V2_PROTOCOL_ID, timer::get_round_timer_func}; + + use super::*; + + #[tokio::test] + async fn consensus_controller_uses_qbft_as_default_and_current() { + let controller = ConsensusController::new(config()).expect("controller should construct"); + let ct = CancellationToken::new(); + + controller.start(ct.clone()); + + let default_consensus = controller.default_consensus(); + assert_eq!(default_consensus.protocol_id(), QBFT_V2_PROTOCOL_ID); + assert_eq!( + controller.current_consensus().protocol_id(), + QBFT_V2_PROTOCOL_ID + ); + + controller + .set_current_consensus_for_protocol(QBFT_V2_PROTOCOL_ID) + .expect("default protocol is supported"); + let err = controller + .set_current_consensus_for_protocol("boo") + .expect_err("unknown protocol should fail"); + assert!(matches!(err, Error::UnsupportedProtocolId)); + + ct.cancel(); + } + + fn config() -> Config { + let ct = CancellationToken::new(); + let (deadliner, expired_rx) = + DeadlinerTask::start(ct, "controller-test", NeverExpiringCalculator); + + Config { + peers: peers(), + local_peer_idx: 0, + privkey: secret_key(1), + deadliner, + expired_rx, + duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester), + broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })), + debugger: Debugger::new(), + compare_attestations: false, + timer_func: get_round_timer_func(), + } + } + + fn peers() -> Vec { + vec![ + qbft::Peer { + index: 0, + name: "node-0".to_string(), + public_key: secret_key(1).public_key(), + }, + qbft::Peer { + index: 1, + name: "node-1".to_string(), + public_key: secret_key(2).public_key(), + }, + ] + } + + fn secret_key(seed: u8) -> SecretKey { + SecretKey::from_slice(&[seed; 32]).expect("test secret key is valid") + } +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index a7673d65..4f99d550 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -4,6 +4,9 @@ //! This crate implements the consensus algorithms and protocols required for //! coordinating validator operations across the distributed network. +/// Consensus protocol controller. +pub mod controller; + /// Consensus debug message buffer. pub mod debugger; diff --git a/crates/consensus/src/qbft/component.rs b/crates/consensus/src/qbft/component.rs index c04ff54a..0abbb4ef 100644 --- a/crates/consensus/src/qbft/component.rs +++ b/crates/consensus/src/qbft/component.rs @@ -76,6 +76,8 @@ pub struct Config { pub privkey: SecretKey, /// Duty deadline scheduler. pub deadliner: DeadlinerHandle, + /// Expired-duty receiver paired with `deadliner`. + pub expired_rx: mpsc::Receiver, /// Duty admission gate. pub duty_gater: DutyGater, /// External message broadcaster. @@ -264,13 +266,14 @@ pub struct Consensus { local_peer_idx: i64, privkey: SecretKey, deadliner: DeadlinerHandle, + expired_rx: Mutex>>, duty_gater: DutyGater, broadcaster: Broadcaster, sniffer: SnifferSink, timer_func: RoundTimerFunc, compare_attestations: bool, subscribers: SubscriberSet, - instances: Mutex>>>, + instances: Arc>>>>, } impl Consensus { @@ -301,13 +304,14 @@ impl Consensus { local_peer_idx: config.local_peer_idx, privkey: config.privkey, deadliner: config.deadliner, + expired_rx: Mutex::new(Some(config.expired_rx)), duty_gater: config.duty_gater, broadcaster: config.broadcaster, sniffer: config.sniffer, timer_func: config.timer_func, compare_attestations: config.compare_attestations, subscribers: SubscriberSet::default(), - instances: Mutex::default(), + instances: Arc::new(Mutex::default()), }) } @@ -421,17 +425,29 @@ impl Consensus { } /// Runs the internal expired-duty cleanup loop until cancellation. - pub fn start( - self: Arc, - ct: CancellationToken, - mut expired_rx: mpsc::Receiver, - ) -> JoinHandle<()> { + pub fn start(&self, ct: CancellationToken) -> JoinHandle<()> { + let expired_rx = self + .expired_rx + .lock() + .unwrap_or_else(PoisonError::into_inner) + .take(); + let instances = Arc::clone(&self.instances); + tokio::spawn(async move { + let Some(mut expired_rx) = expired_rx else { + return; + }; + loop { tokio::select! { () = ct.cancelled() => return, duty = expired_rx.recv() => match duty { - Some(duty) => self.delete_instance_io(&duty), + Some(duty) => { + instances + .lock() + .unwrap_or_else(PoisonError::into_inner) + .remove(&duty); + } None => return, }, } @@ -452,6 +468,7 @@ impl Consensus { } /// Drops cached I/O for a completed or expired duty instance. + #[cfg(test)] pub(crate) fn delete_instance_io(&self, duty: &Duty) { self.instances .lock() @@ -545,6 +562,45 @@ impl Consensus { } } +impl crate::wrapper::Consensus for Consensus { + fn protocol_id(&self) -> String { + self.protocol_id().to_string() + } + + fn start(&self, ct: CancellationToken) { + drop(Consensus::start(self, ct)); + } + + fn participate( + &self, + ct: CancellationToken, + duty: Duty, + ) -> BoxFuture<'_, crate::wrapper::Result<()>> { + Box::pin(async move { + Consensus::participate(self, &ct, duty) + .await + .map_err(|err| Box::new(err) as Box) + }) + } + + fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'_, crate::wrapper::Result<()>> { + Box::pin(async move { + Consensus::propose(self, &ct, duty, value) + .await + .map_err(|err| Box::new(err) as Box) + }) + } + + fn subscribe(&self, subscriber: crate::wrapper::Subscriber) { + Consensus::subscribe(self, subscriber); + } +} + /// Extracts the domain duty from a validated raw QBFT message. fn duty_from_msg(msg: &pbconsensus::QbftMsg) -> Result { let duty = msg.duty.as_ref().ok_or(Error::InvalidConsensusMessage)?; @@ -619,12 +675,19 @@ pub(crate) mod tests { #[tokio::test] async fn start_deletes_expired_instance_io_until_cancelled() { - let consensus = Arc::new(consensus(0, true)); + let (expired_tx, expired_rx) = mpsc::channel(1); + let consensus = Arc::new( + Consensus::new(Config { + peers: peers(), + expired_rx, + ..config_base(false) + }) + .unwrap(), + ); let duty = duty(); let first = consensus.get_instance_io(duty.clone()); let cancel = CancellationToken::new(); - let (expired_tx, expired_rx) = mpsc::channel(1); - let task = Arc::clone(&consensus).start(cancel.clone(), expired_rx); + let task = consensus.start(cancel.clone()); expired_tx.send(duty.clone()).await.unwrap(); tokio::time::timeout( @@ -1243,7 +1306,7 @@ pub(crate) mod tests { pub(crate) fn config_base(never_expiring: bool) -> Config { let cancel = CancellationToken::new(); - let (deadliner, _expired_rx) = if never_expiring { + let (deadliner, expired_rx) = if never_expiring { DeadlinerTask::start( cancel, "qbft-test", @@ -1258,6 +1321,7 @@ pub(crate) mod tests { local_peer_idx: 0, privkey: secret_key(1), deadliner, + expired_rx, duty_gater: Arc::new(|_| true), broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })), sniffer: Arc::new(|_| {}), diff --git a/crates/consensus/src/qbft/qbft_run_test.rs b/crates/consensus/src/qbft/qbft_run_test.rs index 9b48b33b..e7adb212 100644 --- a/crates/consensus/src/qbft/qbft_run_test.rs +++ b/crates/consensus/src/qbft/qbft_run_test.rs @@ -91,7 +91,6 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { let (decided_tx, mut decided_rx) = mpsc::unbounded_channel(); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -101,9 +100,7 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(start_ct.clone(), expired_rx)); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -148,7 +145,6 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -163,7 +159,6 @@ async fn qbft_priority_consensus() { let duty = Duty::new(SlotNumber::new(1), DutyType::InfoSync); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -173,9 +168,7 @@ async fn qbft_priority_consensus() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(start_ct.clone(), expired_rx)); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -216,7 +209,6 @@ async fn qbft_priority_consensus() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -232,7 +224,6 @@ async fn qbft_consensus_participate_then_late_propose() { let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -242,9 +233,7 @@ async fn qbft_consensus_participate_then_late_propose() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(start_ct.clone(), expired_rx)); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -302,7 +291,6 @@ async fn qbft_consensus_participate_then_late_propose() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -323,7 +311,6 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -333,9 +320,7 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(start_ct.clone(), expired_rx)); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -363,7 +348,6 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { assert!(decided_rx.try_recv().is_err()); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -386,7 +370,6 @@ async fn run_qbft_consensus( let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -396,9 +379,7 @@ async fn run_qbft_consensus( Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(start_ct.clone(), expired_rx)); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -442,7 +423,6 @@ async fn run_qbft_consensus( ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } diff --git a/crates/consensus/src/wrapper.rs b/crates/consensus/src/wrapper.rs index 8c5c3ad0..a69401ab 100644 --- a/crates/consensus/src/wrapper.rs +++ b/crates/consensus/src/wrapper.rs @@ -28,7 +28,7 @@ pub trait Consensus: Send + Sync { fn start(&self, ct: CancellationToken); /// Starts participating in a consensus instance. - fn participate(&self, ct: CancellationToken, duty: Duty) -> BoxFuture<'static, Result<()>>; + fn participate(&self, ct: CancellationToken, duty: Duty) -> BoxFuture<'_, Result<()>>; /// Proposes unsigned duty data for a consensus instance. fn propose( @@ -36,7 +36,7 @@ pub trait Consensus: Send + Sync { ct: CancellationToken, duty: Duty, value: pbcore::UnsignedDataSet, - ) -> BoxFuture<'static, Result<()>>; + ) -> BoxFuture<'_, Result<()>>; /// Registers a callback for decided unsigned duty data. fn subscribe(&self, subscriber: Subscriber); @@ -201,11 +201,7 @@ mod tests { self.record("start"); } - fn participate( - &self, - _ct: CancellationToken, - _duty: Duty, - ) -> BoxFuture<'static, Result<()>> { + fn participate(&self, _ct: CancellationToken, _duty: Duty) -> BoxFuture<'_, Result<()>> { self.record("participate"); async { Ok(()) }.boxed() } @@ -215,7 +211,7 @@ mod tests { _ct: CancellationToken, _duty: Duty, _value: pbcore::UnsignedDataSet, - ) -> BoxFuture<'static, Result<()>> { + ) -> BoxFuture<'_, Result<()>> { self.record("propose"); async { Ok(()) }.boxed() }