From 85d8c22c49da9027cfe14deb9a320c3ba160b406 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 5 Jun 2026 15:36:54 +0700 Subject: [PATCH 1/3] feat(consensus): implement consensus debugger --- Cargo.lock | 2 + crates/consensus/Cargo.toml | 2 + crates/consensus/src/debugger.rs | 382 +++++++++++++++++++++++++++++++ crates/consensus/src/lib.rs | 3 + 4 files changed, 389 insertions(+) create mode 100644 crates/consensus/src/debugger.rs diff --git a/Cargo.lock b/Cargo.lock index fb88bd1a..ec3fa04d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5589,12 +5589,14 @@ name = "pluto-consensus" version = "1.7.1" dependencies = [ "anyhow", + "axum", "cancellation", "chrono", "clap", "crossbeam", "either", "ethereum_ssz", + "flate2", "futures", "hex", "k256", diff --git a/crates/consensus/Cargo.toml b/crates/consensus/Cargo.toml index 65dbb345..696ffe51 100644 --- a/crates/consensus/Cargo.toml +++ b/crates/consensus/Cargo.toml @@ -7,9 +7,11 @@ license.workspace = true publish.workspace = true [dependencies] +axum.workspace = true cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true +flate2.workspace = true futures.workspace = true hex.workspace = true either.workspace = true diff --git a/crates/consensus/src/debugger.rs b/crates/consensus/src/debugger.rs new file mode 100644 index 00000000..bd74efcf --- /dev/null +++ b/crates/consensus/src/debugger.rs @@ -0,0 +1,382 @@ +//! Consensus debug message buffer. +//! +//! [`Debugger`] stores completed QBFT sniffer instances in a bounded FIFO +//! buffer and serves them as a gzipped [`SniffedConsensusInstances`] protobuf. +//! +//! # Usage +//! +//! Create one debugger during node startup, wire its sniffer callback into the +//! QBFT consensus config, and mount its router on the debug HTTP server: +//! +//! ```no_run +//! use axum::Router; +//! use pluto_consensus::debugger::Debugger; +//! +//! let debugger = Debugger::new(); +//! +//! let sniffer = debugger.sniffer(); +//! let app = Router::new().merge(debugger.router("/debug/consensus")); +//! +//! // Pass `sniffer` into `qbft::Config { sniffer, .. }`. +//! // Serve `app` with the node's debug HTTP server. +//! ``` +//! +//! Lower-level callers can use [`Debugger::add_instance`] to store an instance, +//! [`Debugger::get_zipped_proto`] to get the raw gzipped protobuf bytes, or +//! [`Debugger::serve_http`] to build a single HTTP response without using +//! [`Debugger::router`]. + +use std::{ + collections::VecDeque, + io::Write, + sync::{Arc, Mutex, PoisonError}, +}; + +use axum::{ + Router, + body::Body, + http::{ + HeaderValue, Response, StatusCode, + header::{CONTENT_DISPOSITION, CONTENT_TYPE}, + }, + routing::get, +}; +use flate2::{Compression, write::GzEncoder}; +use pluto_core::{ + corepb::v1::consensus::{SniffedConsensusInstance, SniffedConsensusInstances}, + version, +}; +use prost::Message; + +use crate::qbft::SnifferSink; + +const DEFAULT_MAX_BUFFER_SIZE: usize = 52_428_800; +const DEBUGGER_CONTENT_TYPE: &str = "application/octet-stream"; +const DEBUGGER_FILENAME: &str = r#"attachment; filename="consensus_messages.pb.gz""#; +const DEBUGGER_ERROR_BODY: &str = "something went wrong, see logs\n"; + +/// Consensus debug message buffer. +#[derive(Clone, Debug)] +pub struct Debugger { + inner: Arc, +} + +#[derive(Debug)] +struct Inner { + git_hash: String, + max_buffer_size: usize, + state: Mutex, +} + +#[derive(Debug, Default)] +struct State { + total_size: usize, + instances: VecDeque, +} + +/// Debugger serialization error. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Protobuf encoding failed. + #[error("marshal proto: {0}")] + MarshalProto(#[from] prost::EncodeError), + /// Gzip writer failed. + #[error("zip proto: {0}")] + ZipProto(std::io::Error), + /// Gzip writer close failed. + #[error("close gzip writer: {0}")] + CloseGzipWriter(std::io::Error), +} + +impl Debugger { + /// Returns a new consensus debugger. + pub fn new() -> Self { + let (git_hash, _) = version::git_commit(); + Self::with_git_hash_and_max_buffer(git_hash, DEFAULT_MAX_BUFFER_SIZE) + } + + /// Adds a sniffed consensus instance to the FIFO buffer. + pub fn add_instance(&self, instance: SniffedConsensusInstance) { + let size = instance.encoded_len(); + let mut state = self + .inner + .state + .lock() + .unwrap_or_else(PoisonError::into_inner); + + state.total_size = state.total_size.saturating_add(size); + state.instances.push_back(instance); + + while state.total_size > self.inner.max_buffer_size { + let Some(dropped) = state.instances.pop_front() else { + state.total_size = 0; + break; + }; + state.total_size = state.total_size.saturating_sub(dropped.encoded_len()); + } + } + + /// Returns the buffered consensus instances as a gzipped protobuf payload. + pub fn get_zipped_proto(&self) -> Result, Error> { + let instances = { + let state = self + .inner + .state + .lock() + .unwrap_or_else(PoisonError::into_inner); + state.instances.iter().cloned().collect() + }; + + let mut encoded = Vec::new(); + SniffedConsensusInstances { + instances, + git_hash: self.inner.git_hash.clone(), + } + .encode(&mut encoded)?; + + let mut encoder = GzEncoder::new(Vec::new(), Compression::fast()); + encoder.write_all(&encoded).map_err(Error::ZipProto)?; + encoder.finish().map_err(Error::CloseGzipWriter) + } + + /// Returns an HTTP response containing the gzipped protobuf payload. + pub fn serve_http(&self) -> Response { + match self.get_zipped_proto() { + Ok(body) => Response::builder() + .header( + CONTENT_TYPE, + HeaderValue::from_static(DEBUGGER_CONTENT_TYPE), + ) + .header( + CONTENT_DISPOSITION, + HeaderValue::from_static(DEBUGGER_FILENAME), + ) + .body(Body::from(body)) + .unwrap_or_else(|error| { + tracing::warn!(%error, "Error serving consensus debug"); + error_response() + }), + Err(error) => { + tracing::warn!(%error, "Error serving consensus debug"); + error_response() + } + } + } + + /// Returns a sink that stores completed QBFT sniffer instances. + pub fn sniffer(&self) -> SnifferSink { + let debugger = self.clone(); + Arc::new(move |instance| debugger.add_instance(instance)) + } + + /// Returns an axum router serving this debugger at `path`. + pub fn router(&self, path: &'static str) -> Router { + let debugger = self.clone(); + Router::new().route( + path, + get(move || { + let debugger = debugger.clone(); + async move { debugger.serve_http() } + }), + ) + } + + fn with_git_hash_and_max_buffer(git_hash: String, max_buffer_size: usize) -> Self { + Self { + inner: Arc::new(Inner { + git_hash, + max_buffer_size, + state: Mutex::default(), + }), + } + } +} + +impl Default for Debugger { + fn default() -> Self { + Self::new() + } +} + +fn error_response() -> Response { + let mut response = Response::new(Body::from(DEBUGGER_ERROR_BODY)); + *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + response +} + +#[cfg(test)] +mod tests { + use std::io::Read; + + use axum::body; + use flate2::read::GzDecoder; + use pluto_core::corepb::v1::consensus::{QbftConsensusMsg, QbftMsg, SniffedConsensusMsg}; + use prost_types::Timestamp; + + use super::*; + + #[tokio::test] + async fn debugger_serves_gzipped_sniffed_consensus_instances() { + let debugger = Debugger::with_git_hash_and_max_buffer("test-hash".to_string(), usize::MAX); + let instances = (0..10).map(sniffed_instance).collect::>(); + + for instance in instances.clone() { + debugger.add_instance(instance); + } + + let response = debugger.serve_http(); + + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.headers().get(CONTENT_TYPE), + Some(&HeaderValue::from_static(DEBUGGER_CONTENT_TYPE)) + ); + assert_eq!( + response.headers().get(CONTENT_DISPOSITION), + Some(&HeaderValue::from_static(DEBUGGER_FILENAME)) + ); + + let body = body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("debug response body is readable"); + let decoded = decode_gzipped_instances(&body); + + assert_eq!( + decoded, + SniffedConsensusInstances { + instances, + git_hash: "test-hash".to_string(), + } + ); + } + + #[test] + fn add_instance_drops_oldest_instances_when_capacity_is_exceeded() { + let first = sniffed_instance(1); + let second = sniffed_instance(2); + let third = sniffed_instance(3); + let max_buffer = second + .encoded_len() + .checked_add(third.encoded_len()) + .expect("test instances fit usize"); + let debugger = Debugger::with_git_hash_and_max_buffer("test-hash".to_string(), max_buffer); + + debugger.add_instance(first); + debugger.add_instance(second.clone()); + debugger.add_instance(third.clone()); + + let decoded = decode_gzipped_instances( + &debugger + .get_zipped_proto() + .expect("debugger payload should encode"), + ); + + assert_eq!(decoded.instances, vec![second, third]); + } + + #[test] + fn new_debugger_sets_git_hash() { + let debugger = Debugger::new(); + let decoded = decode_gzipped_instances( + &debugger + .get_zipped_proto() + .expect("debugger payload should encode"), + ); + + assert_eq!(decoded.git_hash, version::git_commit().0); + } + + #[test] + fn cloned_debugger_shares_buffer() { + let debugger = Debugger::with_git_hash_and_max_buffer("test-hash".to_string(), usize::MAX); + let cloned = debugger.clone(); + let instance = sniffed_instance(1); + + cloned.add_instance(instance.clone()); + + let decoded = decode_gzipped_instances( + &debugger + .get_zipped_proto() + .expect("debugger payload should encode"), + ); + + assert_eq!(decoded.instances, vec![instance]); + } + + #[test] + fn sniffer_adds_instances() { + let debugger = Debugger::with_git_hash_and_max_buffer("test-hash".to_string(), usize::MAX); + let sniffer = debugger.sniffer(); + let instance = sniffed_instance(1); + + sniffer(instance.clone()); + + let decoded = decode_gzipped_instances( + &debugger + .get_zipped_proto() + .expect("debugger payload should encode"), + ); + + assert_eq!(decoded.instances, vec![instance]); + } + + #[test] + fn router_constructs_debug_endpoint() { + let debugger = Debugger::with_git_hash_and_max_buffer("test-hash".to_string(), usize::MAX); + let _router = debugger.router("/debug/consensus"); + } + + fn decode_gzipped_instances(bytes: &[u8]) -> SniffedConsensusInstances { + let mut decoder = GzDecoder::new(bytes); + let mut decoded = Vec::new(); + decoder + .read_to_end(&mut decoded) + .expect("gzip payload should decode"); + SniffedConsensusInstances::decode(decoded.as_slice()) + .expect("sniffed consensus instances should decode") + } + + fn sniffed_instance(seed: i64) -> SniffedConsensusInstance { + SniffedConsensusInstance { + started_at: Some(Timestamp { + seconds: seed, + nanos: 0, + }), + nodes: 4, + peer_idx: seed, + msgs: vec![SniffedConsensusMsg { + timestamp: Some(Timestamp { + seconds: seed + .checked_add(1) + .expect("test timestamp increment fits i64"), + nanos: 0, + }), + msg: Some(QbftConsensusMsg { + msg: Some(QbftMsg { + r#type: seed, + peer_idx: seed, + round: seed, + prepared_round: seed, + ..Default::default() + }), + justification: vec![ + QbftMsg { + round: seed + .checked_add(1) + .expect("test justification round fits i64"), + ..Default::default() + }, + QbftMsg { + round: seed + .checked_add(2) + .expect("test justification round fits i64"), + ..Default::default() + }, + ], + values: Vec::new(), + }), + }], + protocol_id: "test-protocol".to_string(), + } + } +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 75b5a06e..2f2d0dfc 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -4,6 +4,9 @@ //! This crate implements the consensus algorithms and protocols required for //! coordinating validator operations across the distributed network. +/// Consensus debug message buffer. +pub mod debugger; + /// Consensus protocols. pub mod protocols; From c3864c979a5599a1862f6693be4f385473e2bb23 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 12 Jun 2026 14:35:52 +0700 Subject: [PATCH 2/3] feat(consensus): implement consensus wrapper + controller (#473) * feat(consensus): implement consensus wrapper * feat(consensus): implement consensus controller --- crates/consensus/examples/qbft.rs | 3 +- crates/consensus/src/controller.rs | 191 +++++++++++++++++ crates/consensus/src/lib.rs | 6 + crates/consensus/src/qbft/component.rs | 88 ++++++-- crates/consensus/src/qbft/qbft_run_test.rs | 30 +-- crates/consensus/src/wrapper.rs | 228 +++++++++++++++++++++ 6 files changed, 508 insertions(+), 38 deletions(-) create mode 100644 crates/consensus/src/controller.rs create mode 100644 crates/consensus/src/wrapper.rs diff --git a/crates/consensus/examples/qbft.rs b/crates/consensus/examples/qbft.rs index 2ae29fc7..17b3eda1 100644 --- a/crates/consensus/examples/qbft.rs +++ b/crates/consensus/examples/qbft.rs @@ -602,6 +602,7 @@ fn build_consensus( local_peer_idx: i64::try_from(fixture.local_index)?, privkey: fixture.key.clone(), deadliner, + expired_rx, duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester), broadcaster, sniffer: Arc::new(move |instance| { @@ -623,7 +624,7 @@ fn build_consensus( }); Ok(()) }); - let lifecycle_task = Arc::clone(&component).start(expired_rx, cancel.child_token()); + let lifecycle_task = component.start(cancel.child_token()); Ok((component, lifecycle_task)) } diff --git a/crates/consensus/src/controller.rs b/crates/consensus/src/controller.rs new file mode 100644 index 00000000..c40a8998 --- /dev/null +++ b/crates/consensus/src/controller.rs @@ -0,0 +1,191 @@ +//! Consensus protocol controller. + +use std::sync::Arc; + +use k256::SecretKey; +use pluto_core::{deadline::DeadlinerHandle, types::Duty}; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; + +use crate::{ + debugger::Debugger, + qbft, + timer::RoundTimerFunc, + wrapper::{Consensus, ConsensusWrapper}, +}; + +/// Consensus controller result. +pub type Result = std::result::Result; + +/// Consensus controller error. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to construct the default QBFT consensus implementation. + #[error("{0}")] + Qbft(#[from] qbft::Error), + /// Protocol ID is not supported by this controller. + #[error("unsupported protocol id")] + UnsupportedProtocolId, +} + +/// Consensus controller constructor config. +pub struct Config { + /// Consensus peers in process-index order. + pub peers: Vec, + /// Local zero-based process index. + pub local_peer_idx: i64, + /// Local secp256k1 private key. + pub privkey: SecretKey, + /// Duty deadline scheduler. + pub deadliner: DeadlinerHandle, + /// Expired-duty receiver paired with `deadliner`. + pub expired_rx: mpsc::Receiver, + /// Duty admission gate. + pub duty_gater: qbft::DutyGater, + /// External message broadcaster. + pub broadcaster: qbft::Broadcaster, + /// Consensus debugger. + pub debugger: Debugger, + /// Enables attestation value comparison. + pub compare_attestations: bool, + /// Round timer factory. + pub timer_func: RoundTimerFunc, +} + +/// Controls the active consensus protocol implementation. +pub struct ConsensusController { + default_consensus: Arc, + wrapped_consensus: ConsensusWrapper, +} + +impl ConsensusController { + /// Creates a new consensus controller with QBFT as the default protocol. + pub fn new(config: Config) -> Result { + let qbft = Arc::new(qbft::Consensus::new(qbft::Config { + peers: config.peers, + local_peer_idx: config.local_peer_idx, + privkey: config.privkey, + deadliner: config.deadliner, + expired_rx: config.expired_rx, + duty_gater: config.duty_gater, + broadcaster: config.broadcaster, + sniffer: config.debugger.sniffer(), + compare_attestations: config.compare_attestations, + timer_func: config.timer_func, + })?); + let default_consensus: Arc = qbft; + + Ok(Self { + wrapped_consensus: ConsensusWrapper::new(default_consensus.clone()), + default_consensus, + }) + } + + /// Starts the default consensus implementation. + pub fn start(&self, ct: CancellationToken) { + self.default_consensus.start(ct); + } + + /// Returns the default consensus implementation. + pub fn default_consensus(&self) -> Arc { + Arc::clone(&self.default_consensus) + } + + /// Returns the current consensus wrapper. + pub fn current_consensus(&self) -> &ConsensusWrapper { + &self.wrapped_consensus + } + + /// Sets the current consensus implementation for `protocol`. + pub fn set_current_consensus_for_protocol(&self, protocol: &str) -> Result<()> { + if self.wrapped_consensus.protocol_id() == protocol { + return Ok(()); + } + + if self.default_consensus.protocol_id() == protocol { + self.wrapped_consensus + .set_impl(Arc::clone(&self.default_consensus)); + return Ok(()); + } + + Err(Error::UnsupportedProtocolId) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use k256::SecretKey; + use pluto_core::{ + deadline::{DeadlinerTask, NeverExpiringCalculator}, + types::DutyType, + }; + + use crate::{debugger::Debugger, protocols::QBFT_V2_PROTOCOL_ID, timer::get_round_timer_func}; + + use super::*; + + #[tokio::test] + async fn consensus_controller_uses_qbft_as_default_and_current() { + let controller = ConsensusController::new(config()).expect("controller should construct"); + let ct = CancellationToken::new(); + + controller.start(ct.clone()); + + let default_consensus = controller.default_consensus(); + assert_eq!(default_consensus.protocol_id(), QBFT_V2_PROTOCOL_ID); + assert_eq!( + controller.current_consensus().protocol_id(), + QBFT_V2_PROTOCOL_ID + ); + + controller + .set_current_consensus_for_protocol(QBFT_V2_PROTOCOL_ID) + .expect("default protocol is supported"); + let err = controller + .set_current_consensus_for_protocol("boo") + .expect_err("unknown protocol should fail"); + assert!(matches!(err, Error::UnsupportedProtocolId)); + + ct.cancel(); + } + + fn config() -> Config { + let ct = CancellationToken::new(); + let (deadliner, expired_rx) = + DeadlinerTask::start(ct, "controller-test", NeverExpiringCalculator); + + Config { + peers: peers(), + local_peer_idx: 0, + privkey: secret_key(1), + deadliner, + expired_rx, + duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester), + broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })), + debugger: Debugger::new(), + compare_attestations: false, + timer_func: get_round_timer_func(), + } + } + + fn peers() -> Vec { + vec![ + qbft::Peer { + index: 0, + name: "node-0".to_string(), + public_key: secret_key(1).public_key(), + }, + qbft::Peer { + index: 1, + name: "node-1".to_string(), + public_key: secret_key(2).public_key(), + }, + ] + } + + fn secret_key(seed: u8) -> SecretKey { + SecretKey::from_slice(&[seed; 32]).expect("test secret key is valid") + } +} diff --git a/crates/consensus/src/lib.rs b/crates/consensus/src/lib.rs index 2f2d0dfc..4f99d550 100644 --- a/crates/consensus/src/lib.rs +++ b/crates/consensus/src/lib.rs @@ -4,6 +4,9 @@ //! This crate implements the consensus algorithms and protocols required for //! coordinating validator operations across the distributed network. +/// Consensus protocol controller. +pub mod controller; + /// Consensus debug message buffer. pub mod debugger; @@ -17,3 +20,6 @@ pub mod qbft; /// Consensus round timers. pub mod timer; + +/// Swappable consensus implementation wrapper. +pub mod wrapper; diff --git a/crates/consensus/src/qbft/component.rs b/crates/consensus/src/qbft/component.rs index 13524a87..f53662e6 100644 --- a/crates/consensus/src/qbft/component.rs +++ b/crates/consensus/src/qbft/component.rs @@ -76,6 +76,8 @@ pub struct Config { pub privkey: SecretKey, /// Duty deadline scheduler. pub deadliner: DeadlinerHandle, + /// Expired-duty receiver paired with `deadliner`. + pub expired_rx: mpsc::Receiver, /// Duty admission gate. pub duty_gater: DutyGater, /// External message broadcaster. @@ -263,13 +265,14 @@ pub struct Consensus { local_peer_idx: i64, privkey: SecretKey, deadliner: DeadlinerHandle, + expired_rx: Mutex>>, duty_gater: DutyGater, broadcaster: Broadcaster, sniffer: SnifferSink, timer_func: RoundTimerFunc, compare_attestations: bool, subscribers: SubscriberSet, - instances: Mutex>>>, + instances: Arc>>>>, } impl Consensus { @@ -297,13 +300,14 @@ impl Consensus { local_peer_idx: config.local_peer_idx, privkey: config.privkey, deadliner: config.deadliner, + expired_rx: Mutex::new(Some(config.expired_rx)), duty_gater: config.duty_gater, broadcaster: config.broadcaster, sniffer: config.sniffer, timer_func: config.timer_func, compare_attestations: config.compare_attestations, subscribers: SubscriberSet::default(), - instances: Mutex::default(), + instances: Arc::new(Mutex::default()), }) } @@ -417,17 +421,29 @@ impl Consensus { } /// Runs the internal expired-duty cleanup loop until cancellation. - pub fn start( - self: Arc, - mut expired_rx: mpsc::Receiver, - ct: CancellationToken, - ) -> JoinHandle<()> { + pub fn start(&self, ct: CancellationToken) -> JoinHandle<()> { + let expired_rx = self + .expired_rx + .lock() + .unwrap_or_else(PoisonError::into_inner) + .take(); + let instances = Arc::clone(&self.instances); + tokio::spawn(async move { + let Some(mut expired_rx) = expired_rx else { + return; + }; + loop { tokio::select! { () = ct.cancelled() => return, duty = expired_rx.recv() => match duty { - Some(duty) => self.delete_instance_io(&duty), + Some(duty) => { + instances + .lock() + .unwrap_or_else(PoisonError::into_inner) + .remove(&duty); + } None => return, }, } @@ -448,6 +464,7 @@ impl Consensus { } /// Drops cached I/O for a completed or expired duty instance. + #[cfg(test)] pub(crate) fn delete_instance_io(&self, duty: &Duty) { self.instances .lock() @@ -539,6 +556,45 @@ impl Consensus { } } +impl crate::wrapper::Consensus for Consensus { + fn protocol_id(&self) -> String { + self.protocol_id().to_string() + } + + fn start(&self, ct: CancellationToken) { + drop(Consensus::start(self, ct)); + } + + fn participate( + &self, + ct: CancellationToken, + duty: Duty, + ) -> BoxFuture<'_, crate::wrapper::Result<()>> { + Box::pin(async move { + Consensus::participate(self, duty, &ct) + .await + .map_err(|err| Box::new(err) as Box) + }) + } + + fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'_, crate::wrapper::Result<()>> { + Box::pin(async move { + Consensus::propose(self, duty, value, &ct) + .await + .map_err(|err| Box::new(err) as Box) + }) + } + + fn subscribe(&self, subscriber: crate::wrapper::Subscriber) { + Consensus::subscribe(self, subscriber); + } +} + /// Extracts the domain duty from a validated raw QBFT message. fn duty_from_msg(msg: &pbconsensus::QbftMsg) -> Result { let duty = msg.duty.as_ref().ok_or(Error::InvalidConsensusMessage)?; @@ -613,12 +669,19 @@ pub(crate) mod tests { #[tokio::test] async fn start_deletes_expired_instance_io_until_cancelled() { - let consensus = Arc::new(consensus(0, true)); + let (expired_tx, expired_rx) = mpsc::channel(1); + let consensus = Arc::new( + Consensus::new(Config { + peers: peers(), + expired_rx, + ..config_base(false) + }) + .unwrap(), + ); let duty = duty(); let first = consensus.get_instance_io(duty.clone()); let cancel = CancellationToken::new(); - let (expired_tx, expired_rx) = mpsc::channel(1); - let task = Arc::clone(&consensus).start(expired_rx, cancel.clone()); + let task = consensus.start(cancel.clone()); expired_tx.send(duty.clone()).await.unwrap(); tokio::time::timeout( @@ -1237,7 +1300,7 @@ pub(crate) mod tests { pub(crate) fn config_base(never_expiring: bool) -> Config { let cancel = CancellationToken::new(); - let (deadliner, _expired_rx) = if never_expiring { + let (deadliner, expired_rx) = if never_expiring { DeadlinerTask::start( cancel, "qbft-test", @@ -1252,6 +1315,7 @@ pub(crate) mod tests { local_peer_idx: 0, privkey: secret_key(1), deadliner, + expired_rx, duty_gater: Arc::new(|_| true), broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })), sniffer: Arc::new(|_| {}), diff --git a/crates/consensus/src/qbft/qbft_run_test.rs b/crates/consensus/src/qbft/qbft_run_test.rs index 342dbdc0..8840395b 100644 --- a/crates/consensus/src/qbft/qbft_run_test.rs +++ b/crates/consensus/src/qbft/qbft_run_test.rs @@ -91,7 +91,6 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { let (decided_tx, mut decided_rx) = mpsc::unbounded_channel(); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -101,9 +100,7 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(expired_rx, start_ct.clone())); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -140,7 +137,6 @@ async fn qbft_consensus_with_silent_round_one_leader_decides() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -155,7 +151,6 @@ async fn qbft_priority_consensus() { let duty = Duty::new(SlotNumber::new(1), DutyType::InfoSync); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -165,9 +160,7 @@ async fn qbft_priority_consensus() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(expired_rx, start_ct.clone())); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -200,7 +193,6 @@ async fn qbft_priority_consensus() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -216,7 +208,6 @@ async fn qbft_consensus_participate_then_late_propose() { let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -226,9 +217,7 @@ async fn qbft_consensus_participate_then_late_propose() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(expired_rx, start_ct.clone())); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -278,7 +267,6 @@ async fn qbft_consensus_participate_then_late_propose() { ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -299,7 +287,6 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -309,9 +296,7 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(expired_rx, start_ct.clone())); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -333,7 +318,6 @@ async fn qbft_consensus_attester_compare_mismatch_does_not_decide() { assert!(decided_rx.try_recv().is_err()); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } @@ -356,7 +340,6 @@ async fn run_qbft_consensus( let duty = Duty::new(SlotNumber::new(1), DutyType::Attester); let ct = CancellationToken::new(); let start_ct = CancellationToken::new(); - let mut expired_txs = Vec::with_capacity(active_nodes.len()); let mut start_tasks = Vec::with_capacity(active_nodes.len()); for (node_idx, node) in active_nodes.iter().enumerate() { @@ -366,9 +349,7 @@ async fn run_qbft_consensus( Ok(()) }); - let (expired_tx, expired_rx) = mpsc::channel(1); - expired_txs.push(expired_tx); - start_tasks.push(Arc::clone(node).start(expired_rx, start_ct.clone())); + start_tasks.push(node.start(start_ct.clone())); } drop(decided_tx); @@ -404,7 +385,6 @@ async fn run_qbft_consensus( ct.cancel(); start_ct.cancel(); - drop(expired_txs); for task in start_tasks { task.await.unwrap(); } diff --git a/crates/consensus/src/wrapper.rs b/crates/consensus/src/wrapper.rs new file mode 100644 index 00000000..a69401ab --- /dev/null +++ b/crates/consensus/src/wrapper.rs @@ -0,0 +1,228 @@ +//! Swappable consensus implementation wrapper. + +use std::{ + error::Error as StdError, + sync::{Arc, PoisonError, RwLock}, +}; + +use futures::future::BoxFuture; +use pluto_core::{corepb::v1::core as pbcore, types::Duty}; +use tokio_util::sync::CancellationToken; + +/// Consensus wrapper result. +pub type Result = std::result::Result>; + +/// Subscriber callback result. +pub type SubscriberResult = Result<()>; + +/// Subscriber callback for decided unsigned duty data. +pub type Subscriber = + Box SubscriberResult + Send + Sync + 'static>; + +/// Consensus implementation interface. +pub trait Consensus: Send + Sync { + /// Returns the consensus protocol ID. + fn protocol_id(&self) -> String; + + /// Starts the consensus implementation. + fn start(&self, ct: CancellationToken); + + /// Starts participating in a consensus instance. + fn participate(&self, ct: CancellationToken, duty: Duty) -> BoxFuture<'_, Result<()>>; + + /// Proposes unsigned duty data for a consensus instance. + fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'_, Result<()>>; + + /// Registers a callback for decided unsigned duty data. + fn subscribe(&self, subscriber: Subscriber); +} + +/// Wrapper that forwards calls to the current consensus implementation. +pub struct ConsensusWrapper { + implementation: RwLock>, +} + +impl ConsensusWrapper { + /// Wraps a consensus implementation. + pub fn new(implementation: Arc) -> Self { + Self { + implementation: RwLock::new(implementation), + } + } + + /// Sets the current consensus implementation. + pub fn set_impl(&self, implementation: Arc) { + *self + .implementation + .write() + .unwrap_or_else(PoisonError::into_inner) = implementation; + } + + /// Returns the current consensus protocol ID. + pub fn protocol_id(&self) -> String { + self.current().protocol_id() + } + + /// Starts the current consensus implementation. + pub fn start(&self, ct: CancellationToken) { + self.current().start(ct); + } + + /// Starts participating in a consensus instance. + pub async fn participate(&self, ct: CancellationToken, duty: Duty) -> Result<()> { + self.current().participate(ct, duty).await + } + + /// Proposes unsigned duty data for a consensus instance. + pub async fn propose( + &self, + ct: CancellationToken, + duty: Duty, + value: pbcore::UnsignedDataSet, + ) -> Result<()> { + self.current().propose(ct, duty, value).await + } + + /// Registers a callback for decided unsigned duty data. + pub fn subscribe(&self, subscriber: Subscriber) { + self.current().subscribe(subscriber); + } + + fn current(&self) -> Arc { + self.implementation + .read() + .unwrap_or_else(PoisonError::into_inner) + .clone() + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use futures::FutureExt as _; + use pluto_core::{ + corepb::v1::core as pbcore, + types::{Duty, SlotNumber}, + }; + + use crate::protocols::QBFT_V2_PROTOCOL_ID; + + use super::*; + + #[tokio::test] + async fn new_consensus_wrapper_forwards_to_current_impl() { + let ct = CancellationToken::new(); + let duty = Duty::new_randao_duty(SlotNumber::new(123)); + let value = pbcore::UnsignedDataSet::default(); + let first = Arc::new(TestConsensus::new(QBFT_V2_PROTOCOL_ID)); + let wrapper = ConsensusWrapper::new(first.clone()); + + assert_eq!(wrapper.protocol_id(), QBFT_V2_PROTOCOL_ID); + + wrapper + .participate(ct.clone(), duty.clone()) + .await + .expect("participate forwards"); + wrapper + .propose(ct.clone(), duty.clone(), value) + .await + .expect("propose forwards"); + + let subscribed = Arc::new(Mutex::new(Vec::new())); + let subscribed_clone = Arc::clone(&subscribed); + wrapper.subscribe(Box::new(move |duty, _| { + subscribed_clone + .lock() + .unwrap_or_else(PoisonError::into_inner) + .push(duty); + Ok(()) + })); + + wrapper.start(ct); + + assert_eq!( + first.calls(), + vec!["participate", "propose", "subscribe", "start"] + ); + assert_eq!( + subscribed + .lock() + .unwrap_or_else(PoisonError::into_inner) + .as_slice(), + &[duty] + ); + + let second = Arc::new(TestConsensus::new("foobar")); + wrapper.set_impl(second); + + assert_eq!(wrapper.protocol_id(), "foobar"); + } + + struct TestConsensus { + protocol_id: String, + calls: Mutex>, + } + + impl TestConsensus { + fn new(protocol_id: &str) -> Self { + Self { + protocol_id: protocol_id.to_string(), + calls: Mutex::default(), + } + } + + fn calls(&self) -> Vec<&'static str> { + self.calls + .lock() + .unwrap_or_else(PoisonError::into_inner) + .clone() + } + + fn record(&self, call: &'static str) { + self.calls + .lock() + .unwrap_or_else(PoisonError::into_inner) + .push(call); + } + } + + impl Consensus for TestConsensus { + fn protocol_id(&self) -> String { + self.protocol_id.clone() + } + + fn start(&self, _ct: CancellationToken) { + self.record("start"); + } + + fn participate(&self, _ct: CancellationToken, _duty: Duty) -> BoxFuture<'_, Result<()>> { + self.record("participate"); + async { Ok(()) }.boxed() + } + + fn propose( + &self, + _ct: CancellationToken, + _duty: Duty, + _value: pbcore::UnsignedDataSet, + ) -> BoxFuture<'_, Result<()>> { + self.record("propose"); + async { Ok(()) }.boxed() + } + + fn subscribe(&self, subscriber: Subscriber) { + self.record("subscribe"); + subscriber( + Duty::new_randao_duty(SlotNumber::new(123)), + pbcore::UnsignedDataSet::default(), + ) + .expect("test subscriber succeeds"); + } + } +} From 9d45e302f9d534b3d69fc3518795129044026d7e Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 12 Jun 2026 17:26:56 +0700 Subject: [PATCH 3/3] docs(consensus): note single-shot start, deadliner naming, deferred protocol-switch TODO --- crates/consensus/src/controller.rs | 7 ++++++- crates/consensus/src/qbft/component.rs | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/consensus/src/controller.rs b/crates/consensus/src/controller.rs index c40a8998..d45cb39a 100644 --- a/crates/consensus/src/controller.rs +++ b/crates/consensus/src/controller.rs @@ -36,7 +36,8 @@ pub struct Config { pub local_peer_idx: i64, /// Local secp256k1 private key. pub privkey: SecretKey, - /// Duty deadline scheduler. + /// Duty deadline scheduler. Name it `"consensus.qbft"` to match Go's + /// internally-built deadliner for log parity. pub deadliner: DeadlinerHandle, /// Expired-duty receiver paired with `deadliner`. pub expired_rx: mpsc::Receiver, @@ -108,6 +109,10 @@ impl ConsensusController { return Ok(()); } + // TODO: When introducing non-default consensus protocols, mirror Go's + // deferred wrapped-context cancellation here: cancel the previous + // non-default impl, build a `"consensus."` deadliner, set the + // new impl, and start it under a fresh cancellation token. Err(Error::UnsupportedProtocolId) } } diff --git a/crates/consensus/src/qbft/component.rs b/crates/consensus/src/qbft/component.rs index f53662e6..603cd7a0 100644 --- a/crates/consensus/src/qbft/component.rs +++ b/crates/consensus/src/qbft/component.rs @@ -421,6 +421,10 @@ impl Consensus { } /// Runs the internal expired-duty cleanup loop until cancellation. + /// + /// Must be called exactly once: it `take()`s `expired_rx`, so any later + /// call spawns a task that returns immediately and the cleanup loop + /// never runs. pub fn start(&self, ct: CancellationToken) -> JoinHandle<()> { let expired_rx = self .expired_rx