Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/consensus/examples/qbft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ fn build_consensus(
local_peer_idx: i64::try_from(fixture.local_index)?,
privkey: fixture.key.clone(),
deadliner,
expired_rx,
duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester),
broadcaster,
sniffer: Arc::new(move |instance| {
Expand All @@ -618,7 +619,7 @@ fn build_consensus(
});
Ok(())
});
let lifecycle_task = Arc::clone(&component).start(cancel.child_token(), expired_rx);
let lifecycle_task = component.start(cancel.child_token());

Ok(ConsensusRuntime {
component,
Expand Down
191 changes: 191 additions & 0 deletions crates/consensus/src/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//! Consensus protocol controller.

use std::sync::Arc;

use k256::SecretKey;
use pluto_core::{deadline::DeadlinerHandle, types::Duty};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

use crate::{
debugger::Debugger,
qbft,
timer::RoundTimerFunc,
wrapper::{Consensus, ConsensusWrapper},
};

/// Consensus controller result.
pub type Result<T> = std::result::Result<T, Error>;

/// Consensus controller error.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to construct the default QBFT consensus implementation.
#[error("{0}")]
Qbft(#[from] qbft::Error),
/// Protocol ID is not supported by this controller.
#[error("unsupported protocol id")]
UnsupportedProtocolId,
}

/// Consensus controller constructor config.
pub struct Config {
/// Consensus peers in process-index order.
pub peers: Vec<qbft::Peer>,
/// Local zero-based process index.
pub local_peer_idx: i64,
/// Local secp256k1 private key.
pub privkey: SecretKey,
/// Duty deadline scheduler.
pub deadliner: DeadlinerHandle,
/// Expired-duty receiver paired with `deadliner`.
pub expired_rx: mpsc::Receiver<Duty>,
/// Duty admission gate.
pub duty_gater: qbft::DutyGater,
/// External message broadcaster.
pub broadcaster: qbft::Broadcaster,
/// Consensus debugger.
pub debugger: Debugger,
/// Enables attestation value comparison.
pub compare_attestations: bool,
/// Round timer factory.
pub timer_func: RoundTimerFunc,
}

/// Controls the active consensus protocol implementation.
pub struct ConsensusController {
default_consensus: Arc<dyn Consensus>,
wrapped_consensus: ConsensusWrapper,
}

impl ConsensusController {
/// Creates a new consensus controller with QBFT as the default protocol.
pub fn new(config: Config) -> Result<Self> {
let qbft = Arc::new(qbft::Consensus::new(qbft::Config {
peers: config.peers,
local_peer_idx: config.local_peer_idx,
privkey: config.privkey,
deadliner: config.deadliner,
expired_rx: config.expired_rx,
duty_gater: config.duty_gater,
broadcaster: config.broadcaster,
sniffer: config.debugger.sniffer(),
compare_attestations: config.compare_attestations,
timer_func: config.timer_func,
})?);
let default_consensus: Arc<dyn Consensus> = qbft;

Ok(Self {
wrapped_consensus: ConsensusWrapper::new(default_consensus.clone()),
default_consensus,
})
}

/// Starts the default consensus implementation.
pub fn start(&self, ct: CancellationToken) {
self.default_consensus.start(ct);
}

/// Returns the default consensus implementation.
pub fn default_consensus(&self) -> Arc<dyn Consensus> {
Arc::clone(&self.default_consensus)
}

/// Returns the current consensus wrapper.
pub fn current_consensus(&self) -> &ConsensusWrapper {
&self.wrapped_consensus
}

/// Sets the current consensus implementation for `protocol`.
pub fn set_current_consensus_for_protocol(&self, protocol: &str) -> Result<()> {
if self.wrapped_consensus.protocol_id() == protocol {
return Ok(());
}

if self.default_consensus.protocol_id() == protocol {
self.wrapped_consensus
.set_impl(Arc::clone(&self.default_consensus));
return Ok(());
}

Err(Error::UnsupportedProtocolId)
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use k256::SecretKey;
use pluto_core::{
deadline::{DeadlinerTask, NeverExpiringCalculator},
types::DutyType,
};

use crate::{debugger::Debugger, protocols::QBFT_V2_PROTOCOL_ID, timer::get_round_timer_func};

use super::*;

#[tokio::test]
async fn consensus_controller_uses_qbft_as_default_and_current() {
let controller = ConsensusController::new(config()).expect("controller should construct");
let ct = CancellationToken::new();

controller.start(ct.clone());

let default_consensus = controller.default_consensus();
assert_eq!(default_consensus.protocol_id(), QBFT_V2_PROTOCOL_ID);
assert_eq!(
controller.current_consensus().protocol_id(),
QBFT_V2_PROTOCOL_ID
);

controller
.set_current_consensus_for_protocol(QBFT_V2_PROTOCOL_ID)
.expect("default protocol is supported");
let err = controller
.set_current_consensus_for_protocol("boo")
.expect_err("unknown protocol should fail");
assert!(matches!(err, Error::UnsupportedProtocolId));

ct.cancel();
}

fn config() -> Config {
let ct = CancellationToken::new();
let (deadliner, expired_rx) =
DeadlinerTask::start(ct, "controller-test", NeverExpiringCalculator);

Config {
peers: peers(),
local_peer_idx: 0,
privkey: secret_key(1),
deadliner,
expired_rx,
duty_gater: Arc::new(|duty| duty.duty_type == DutyType::Attester),
broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })),
debugger: Debugger::new(),
compare_attestations: false,
timer_func: get_round_timer_func(),
}
}

fn peers() -> Vec<qbft::Peer> {
vec![
qbft::Peer {
index: 0,
name: "node-0".to_string(),
public_key: secret_key(1).public_key(),
},
qbft::Peer {
index: 1,
name: "node-1".to_string(),
public_key: secret_key(2).public_key(),
},
]
}

fn secret_key(seed: u8) -> SecretKey {
SecretKey::from_slice(&[seed; 32]).expect("test secret key is valid")
}
}
6 changes: 6 additions & 0 deletions crates/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
//! This crate implements the consensus algorithms and protocols required for
//! coordinating validator operations across the distributed network.

/// Consensus protocol controller.
pub mod controller;

/// Consensus debug message buffer.
pub mod debugger;

Expand All @@ -17,3 +20,6 @@ pub mod qbft;

/// Consensus round timers.
pub mod timer;

/// Swappable consensus implementation wrapper.
pub mod wrapper;
88 changes: 76 additions & 12 deletions crates/consensus/src/qbft/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct Config {
pub privkey: SecretKey,
/// Duty deadline scheduler.
pub deadliner: DeadlinerHandle,
/// Expired-duty receiver paired with `deadliner`.
pub expired_rx: mpsc::Receiver<Duty>,
/// Duty admission gate.
pub duty_gater: DutyGater,
/// External message broadcaster.
Expand Down Expand Up @@ -264,13 +266,14 @@ pub struct Consensus {
local_peer_idx: i64,
privkey: SecretKey,
deadliner: DeadlinerHandle,
expired_rx: Mutex<Option<mpsc::Receiver<Duty>>>,
duty_gater: DutyGater,
broadcaster: Broadcaster,
sniffer: SnifferSink,
timer_func: RoundTimerFunc,
compare_attestations: bool,
subscribers: SubscriberSet,
instances: Mutex<HashMap<Duty, Arc<InstanceIo<msg::Msg>>>>,
instances: Arc<Mutex<HashMap<Duty, Arc<InstanceIo<msg::Msg>>>>>,
}

impl Consensus {
Expand Down Expand Up @@ -301,13 +304,14 @@ impl Consensus {
local_peer_idx: config.local_peer_idx,
privkey: config.privkey,
deadliner: config.deadliner,
expired_rx: Mutex::new(Some(config.expired_rx)),
duty_gater: config.duty_gater,
broadcaster: config.broadcaster,
sniffer: config.sniffer,
timer_func: config.timer_func,
compare_attestations: config.compare_attestations,
subscribers: SubscriberSet::default(),
instances: Mutex::default(),
instances: Arc::new(Mutex::default()),
})
}

Expand Down Expand Up @@ -421,17 +425,29 @@ impl Consensus {
}

/// Runs the internal expired-duty cleanup loop until cancellation.
pub fn start(
self: Arc<Self>,
ct: CancellationToken,
mut expired_rx: mpsc::Receiver<Duty>,
) -> JoinHandle<()> {
pub fn start(&self, ct: CancellationToken) -> JoinHandle<()> {
let expired_rx = self
.expired_rx
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take();
let instances = Arc::clone(&self.instances);

tokio::spawn(async move {
let Some(mut expired_rx) = expired_rx else {
return;
};

loop {
tokio::select! {
() = ct.cancelled() => return,
duty = expired_rx.recv() => match duty {
Some(duty) => self.delete_instance_io(&duty),
Some(duty) => {
instances
.lock()
.unwrap_or_else(PoisonError::into_inner)
.remove(&duty);
}
None => return,
},
}
Expand All @@ -452,6 +468,7 @@ impl Consensus {
}

/// Drops cached I/O for a completed or expired duty instance.
#[cfg(test)]
pub(crate) fn delete_instance_io(&self, duty: &Duty) {
self.instances
.lock()
Expand Down Expand Up @@ -545,6 +562,45 @@ impl Consensus {
}
}

impl crate::wrapper::Consensus for Consensus {
fn protocol_id(&self) -> String {
self.protocol_id().to_string()
}

fn start(&self, ct: CancellationToken) {
drop(Consensus::start(self, ct));
}

fn participate(
&self,
ct: CancellationToken,
duty: Duty,
) -> BoxFuture<'_, crate::wrapper::Result<()>> {
Box::pin(async move {
Consensus::participate(self, &ct, duty)
.await
.map_err(|err| Box::new(err) as Box<dyn StdError + Send + Sync>)
})
}

fn propose(
&self,
ct: CancellationToken,
duty: Duty,
value: pbcore::UnsignedDataSet,
) -> BoxFuture<'_, crate::wrapper::Result<()>> {
Box::pin(async move {
Consensus::propose(self, &ct, duty, value)
.await
.map_err(|err| Box::new(err) as Box<dyn StdError + Send + Sync>)
})
}

fn subscribe(&self, subscriber: crate::wrapper::Subscriber) {
Consensus::subscribe(self, subscriber);
}
}

/// Extracts the domain duty from a validated raw QBFT message.
fn duty_from_msg(msg: &pbconsensus::QbftMsg) -> Result<Duty> {
let duty = msg.duty.as_ref().ok_or(Error::InvalidConsensusMessage)?;
Expand Down Expand Up @@ -619,12 +675,19 @@ pub(crate) mod tests {

#[tokio::test]
async fn start_deletes_expired_instance_io_until_cancelled() {
let consensus = Arc::new(consensus(0, true));
let (expired_tx, expired_rx) = mpsc::channel(1);
let consensus = Arc::new(
Consensus::new(Config {
peers: peers(),
expired_rx,
..config_base(false)
})
.unwrap(),
);
let duty = duty();
let first = consensus.get_instance_io(duty.clone());
let cancel = CancellationToken::new();
let (expired_tx, expired_rx) = mpsc::channel(1);
let task = Arc::clone(&consensus).start(cancel.clone(), expired_rx);
let task = consensus.start(cancel.clone());

expired_tx.send(duty.clone()).await.unwrap();
tokio::time::timeout(
Expand Down Expand Up @@ -1243,7 +1306,7 @@ pub(crate) mod tests {

pub(crate) fn config_base(never_expiring: bool) -> Config {
let cancel = CancellationToken::new();
let (deadliner, _expired_rx) = if never_expiring {
let (deadliner, expired_rx) = if never_expiring {
DeadlinerTask::start(
cancel,
"qbft-test",
Expand All @@ -1258,6 +1321,7 @@ pub(crate) mod tests {
local_peer_idx: 0,
privkey: secret_key(1),
deadliner,
expired_rx,
duty_gater: Arc::new(|_| true),
broadcaster: Arc::new(|_, _| Box::pin(async { Ok(()) })),
sniffer: Arc::new(|_| {}),
Expand Down
Loading
Loading