diff --git a/Cargo.lock b/Cargo.lock index f31b1b44..0052816a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5852,10 +5852,13 @@ dependencies = [ "either", "futures", "hex", + "k256", "libp2p", "pluto-cluster", "pluto-core", + "pluto-crypto", "pluto-p2p", + "pluto-testutil", "pluto-tracing", "prost 0.14.3", "serde_json", diff --git a/crates/cluster/tests/prototype_mixed_reconstruction.rs b/crates/cluster/tests/prototype_mixed_reconstruction.rs new file mode 100644 index 00000000..54fe394a --- /dev/null +++ b/crates/cluster/tests/prototype_mixed_reconstruction.rs @@ -0,0 +1,59 @@ +//! PROTOTYPE (not yet runnable): reconstruct a group signature from a mix of +//! Pluto- and Charon-generated key shares. +//! +//! This file is a forward specification, not a working test. It is `#[ignore]`d +//! so it never runs in CI; it pins the target and the missing fixture. +//! +//! ## Why this matters +//! +//! Charon compatibility must hold at the *cryptographic* layer, not just for +//! JSON parsing. After a mixed DKG ceremony (some operators on Charon, some on +//! Pluto), every node holds a private share of the **same** distributed +//! validator key. The cluster is only interoperable if a threshold of those +//! shares — regardless of which implementation produced each one — reconstructs +//! a group signature that verifies against the DV public key. +//! +//! ## What already exists +//! +//! - Pure-Pluto t-of-n reconstruction is tested: partials from a threshold of +//! Pluto shares aggregate to a valid group signature +//! (`crates/dkg/src/frostp2p_integ_test.rs:502-536`, via +//! `BlstImpl::threshold_aggregate` + `BlstImpl::verify`). +//! - A mixed 2 Charon + 2 Pluto DKG ceremony runs in CI +//! (`.github/workflows/dkg-runner.yml`). But its semantic check only asserts +//! the lock's `signature_aggregate` has the right byte length +//! (`scripts/dkg-runner/ci/verify-output-semantic.sh`), not that a fresh +//! threshold reconstruction from the mixed shares verifies cryptographically. +//! +//! ## What is missing (the gap) +//! +//! Committed test fixtures containing the **private** key shares produced by +//! Charon for a known group key. The reconstruction primitive +//! (`BlstImpl::threshold_aggregate`) exists and works for Pluto shares; what is +//! absent is access to Charon-generated shares inside a Rust test. A real test +//! needs either captured per-node keystores from a mixed `dkg-runner` ceremony +//! (group pubkey + at least a threshold of secret shares, some authored by +//! Charon) or a committed interop fixture. Running Charon itself from a unit +//! test is out of scope. +//! +//! ## Target scenario (to assert once fixtures exist) +//! +//! 1. Load a known distributed-validator group public key and a threshold of +//! private shares, where at least one share was generated by Charon and at +//! least one by Pluto. +//! 2. Sign a fixed message with each share to produce partial signatures. +//! 3. `BlstImpl::threshold_aggregate` the partials and assert the result +//! verifies against the group public key — proving Charon and Pluto shares +//! are cryptographically interchangeable, not merely format-compatible. + +/// Forward spec for reconstructing a group signature from mixed Charon/Pluto +/// shares. Ignored and intentionally unimplemented: it needs committed Charon +/// private-share fixtures (see the module docs above). +#[test] +#[ignore = "blocked: no committed Charon-generated private share fixtures to reconstruct from"] +fn prototype_test_reconstruct_group_signature_from_mixed_pluto_charon_shares() { + unimplemented!( + "specification only — implement once a mixed-ceremony fixture provides Charon-generated \ + private shares for a known group key (see module docs)" + ); +} diff --git a/crates/core/tests/prototype_duty_runtime.rs b/crates/core/tests/prototype_duty_runtime.rs new file mode 100644 index 00000000..59cf78aa --- /dev/null +++ b/crates/core/tests/prototype_duty_runtime.rs @@ -0,0 +1,65 @@ +//! PROTOTYPE (not yet runnable): validator duty execution over the full +//! runtime pipeline. +//! +//! Forward specifications, all `#[ignore]`d so they never run in CI. They pin +//! the most important runtime proofs and the single blocker. +//! +//! ## The blocker +//! +//! There is no `pluto run` command (`crates/cli/src/cli.rs` exposes only `Enr`, +//! `Create`, `Version`, `Relay`, `Dkg`, `Alpha`). The duty pipeline cannot be +//! assembled or exercised until it exists. +//! +//! ## What already exists (the building blocks) +//! +//! - `DutyDB` (`crates/core/src/dutydb/memory.rs`) +//! - `ValidatorAPI` component (`crates/core/src/validatorapi/component.rs`) — +//! but its submit handlers are dead-code, awaiting the runtime +//! - `ParSigEx` (`crates/parsigex/`), `ParSigDB` +//! (`crates/core/src/parsigdb/memory.rs`), `SigAgg` +//! (`crates/core/src/sigagg.rs`), `AggSigDB` +//! (`crates/core/src/aggsigdb/memory.rs`) +//! - `Tracker` and `Deadliner` (`crates/core/src/{tracker,deadline}/`) +//! - QBFT state machine (`crates/core/src/qbft/`) and libp2p transport (#448) +//! - `BeaconMock` and `ValidatorMock` (`crates/testutil/src/{beaconmock, +//! validatormock}/`) — the simnet doubles are ready +//! +//! ## What is missing (the gap) +//! +//! The runtime glue that wires the blocks into a pipeline: a **scheduler** +//! (duty poller), a **fetcher** (unsigned duty data), a **consensus runner** +//! (spawns `qbft::run()` over the transport), a **broadcaster** (submits +//! aggregated signatures), and the ValidatorAPI **submit handlers**. None of +//! these exist. +//! +//! ## Target pipeline (each test, once `pluto run` exists) +//! +//! scheduler → fetch unsigned duty → QBFT consensus over the `UnsignedDataSet` +//! → store in `DutyDB` → VC signs → ParSigEx exchange → ParSigDB threshold +//! match → SigAgg aggregate → broadcast to the beacon node. + +/// Full attestation duty over the pipeline on a small simnet cluster: +/// scheduler → consensus → VC sign → parsig → aggregate → submit, asserting one +/// aggregated attestation is submitted per validator/slot. +#[test] +#[ignore = "blocked on `pluto run`: no scheduler/fetcher/consensus-runner/broadcaster pipeline"] +fn prototype_test_attestation_duty_full_pipeline() { + unimplemented!("specification only — needs the assembled duty runtime (see module docs)"); +} + +/// Block proposal duty over the pipeline, asserting exactly one valid block is +/// produced and broadcast per validator/slot. +#[test] +#[ignore = "blocked on `pluto run`: no proposer pipeline / broadcaster"] +fn prototype_test_block_proposal_duty_produces_one_block() { + unimplemented!("specification only — needs the assembled duty runtime (see module docs)"); +} + +/// Multi-slot simnet with `BeaconMock` + `ValidatorMock` and 3-4 nodes, +/// asserting attester and proposer duties pass across several slots (parity +/// with Charon's `TestSimnetDuties`). +#[test] +#[ignore = "blocked on `pluto run`: simnet doubles exist but nothing drives the pipeline"] +fn prototype_test_multi_slot_simnet_duties() { + unimplemented!("specification only — needs the assembled duty runtime (see module docs)"); +} diff --git a/crates/core/tests/prototype_qbft_over_libp2p.rs b/crates/core/tests/prototype_qbft_over_libp2p.rs new file mode 100644 index 00000000..6fd6805d --- /dev/null +++ b/crates/core/tests/prototype_qbft_over_libp2p.rs @@ -0,0 +1,59 @@ +//! PROTOTYPE (not yet runnable): QBFT consensus over a real libp2p network. +//! +//! This file is a forward specification, not a working test. It documents the +//! end-to-end QBFT scenario we want to prove and the single piece of production +//! code that is missing to make it real. The test is `#[ignore]`d so it never +//! runs in CI; it exists to pin the target and give the future test a home. +//! +//! ## What already exists +//! +//! - The QBFT state machine: `pluto_core::qbft::run()` +//! (`crates/core/src/qbft/mod.rs:327`). Multi-node consensus reaching a +//! single decided value — including degraded and adversarial cases — is +//! already covered against an **in-memory** transport with a fake clock in +//! `crates/core/src/qbft/internal_test.rs` (`happy`, `stagger_start`, +//! `dropped_messages`, `fuzzed`, `chain_split`). +//! - The libp2p QBFT transport and sniffer landed in #448: +//! `crates/core/src/consensus/qbft/transport.rs` and `.../sniffer.rs`. Both +//! carry `#![allow(dead_code)]` with `TODO: Remove once the consensus runner +//! wires this transport.` +//! - The per-instance plumbing: `pluto_core::consensus::instance::InstanceIo` +//! (`maybe_start()`, `take_recv_rx()`, …) buffers inbound messages until a +//! runner starts. +//! +//! ## What is missing (the only gap) +//! +//! A **consensus runner**: the glue that, for one duty/instance, adapts the +//! libp2p `consensus::qbft::transport` to the state machine's abstract +//! `qbft::Transport`, drains `InstanceIo` once `maybe_start()` returns true, +//! and drives `qbft::run()` — pumping messages between the swarm and the state +//! machine. Nothing spawns `qbft::run()` over the network today, so this test +//! cannot be written yet. This is the highest-leverage runtime work available +//! before `pluto run`. +//! +//! ## Target scenario (to assert once the runner exists) +//! +//! 1. Start N (e.g. 4) Pluto nodes, each running the consensus runner over the +//! libp2p transport, connected in a full mesh over loopback TCP. +//! 2. Feed each honest node the same proposed value for one instance. +//! 3. Assert every honest node decides, and they all decide the **same** value +//! in the same instance — proving wire serialization + transport + runner +//! interoperate, not just the in-memory state machine. +//! +//! Follow-on variants (own prototypes/commits later): one crashed node with the +//! remaining majority still deciding, and a Byzantine proposer that must never +//! cause two honest nodes to decide different values. + +/// Forward spec for QBFT reaching a single decision across nodes over libp2p. +/// +/// Ignored and intentionally not implemented: it requires a consensus runner +/// that wires `consensus::qbft::transport` + `consensus::instance::InstanceIo` +/// into `qbft::run()`. See the module docs above. +#[test] +#[ignore = "blocked: no consensus runner wires the libp2p QBFT transport into qbft::run()"] +fn prototype_test_qbft_reaches_single_decision_over_libp2p() { + unimplemented!( + "specification only — implement once a consensus runner drives qbft::run() over the \ + consensus::qbft libp2p transport (see module docs)" + ); +} diff --git a/crates/core/tests/prototype_runtime_charon_parity.rs b/crates/core/tests/prototype_runtime_charon_parity.rs new file mode 100644 index 00000000..e342e47c --- /dev/null +++ b/crates/core/tests/prototype_runtime_charon_parity.rs @@ -0,0 +1,39 @@ +//! PROTOTYPE (not yet runnable): mixed Pluto + Charon runtime interoperability. +//! +//! Forward specifications, all `#[ignore]`d so they never run in CI. These +//! prove real interoperability: a cluster of mixed Charon and Pluto nodes +//! executing live validator duties together, not just exchanging static files. +//! +//! ## The blocker +//! +//! No `pluto run` command / assembled duty runtime (see +//! `prototype_duty_runtime.rs`). Static-artifact Charon parity is already +//! covered — Charon-created locks parse and verify in Pluto +//! (`crates/cluster/src/lock.rs`, V1.0-V1.10 fixtures) and a mixed 2 Charon + +//! 2 Pluto DKG ceremony runs in CI (`.github/workflows/dkg-runner.yml`). What +//! cannot be reproduced is a mixed cluster running the *runtime*: agreeing on +//! duty data over QBFT, exchanging partials over ParSigEx, and aggregating — +//! across both implementations. +//! +//! ## Target scenarios (once `pluto run` exists) +//! +//! Wire-level parity of QBFT (`/charon/consensus/...`) and ParSigEx +//! (`/charon/parsigex/2.0.0`) must let Charon and Pluto nodes participate in +//! the same consensus game and partial-signature exchange, then all agree on +//! the duty hash and produce a valid aggregated signature. + +/// Mixed runtime, 3 Charon + 1 Pluto: duties pass and all nodes agree on the +/// duty hash (minimum real interoperability). +#[test] +#[ignore = "blocked on `pluto run`: no runtime for a mixed cluster to execute duties"] +fn prototype_test_mixed_runtime_three_charon_one_pluto() { + unimplemented!("specification only — needs `pluto run` + QBFT/ParSigEx wire parity"); +} + +/// Mixed runtime, 2 Charon + 2 Pluto: attestation and proposer duties pass +/// (stronger parity). +#[test] +#[ignore = "blocked on `pluto run`: no runtime for a mixed cluster to execute duties"] +fn prototype_test_mixed_runtime_two_charon_two_pluto() { + unimplemented!("specification only — needs `pluto run` + QBFT/ParSigEx wire parity"); +} diff --git a/crates/core/tests/prototype_runtime_safety.rs b/crates/core/tests/prototype_runtime_safety.rs new file mode 100644 index 00000000..81aac40b --- /dev/null +++ b/crates/core/tests/prototype_runtime_safety.rs @@ -0,0 +1,82 @@ +//! PROTOTYPE (not yet runnable): runtime safety / anti-slashing guarantees. +//! +//! Forward specifications, all `#[ignore]`d so they never run in CI. These are +//! the safety properties a distributed validator exists to provide; none can be +//! exercised yet. +//! +//! ## The blocker(s) +//! +//! - No `pluto run` command / assembled duty runtime (see +//! `prototype_duty_runtime.rs`). Without scheduler → consensus → VC sign → +//! parsig → aggregate → broadcast, none of these scenarios can be reproduced. +//! - No **slashing-protection database**. There is no persistent record of what +//! a validator has already signed anywhere in the tree (the `*_slashings` +//! symbols in `crates/core/src/{signeddata,dutydb}` are block-body fields, +//! not slashing protection). The surround/double-vote-across-restart case +//! additionally needs this DB to be built. +//! - `privkeylock` itself exists (`crates/app/src/privkeylock.rs`); only the +//! run-level enforcement test is missing. +//! +//! ## Why consensus alone is not enough +//! +//! Threshold aggregation is only safe if every node signs the *same* data. +//! Consensus over the `UnsignedDataSet` plus `DutyDB` persistence plus slashing +//! protection together prevent a malicious beacon node or a compromised VC from +//! inducing a double vote / double proposal. Each test below pins one such +//! guarantee. + +/// A malicious beacon node serves conflicting attestation data to different +/// nodes; the cluster must sign one root or nothing — never two. +#[test] +#[ignore = "blocked on `pluto run`: no consensus-backed duty pipeline to defend"] +fn prototype_test_malicious_bn_conflicting_attestation_data() { + unimplemented!("specification only — needs the assembled duty runtime + consensus"); +} + +/// A malicious beacon node attempts to induce a double block proposal; at most +/// one block signature must be produced/broadcast per validator/slot. +#[test] +#[ignore = "blocked on `pluto run`: no proposer pipeline to defend"] +fn prototype_test_malicious_bn_double_proposal_is_prevented() { + unimplemented!("specification only — needs the assembled duty runtime"); +} + +/// A compromised validator client submits a partial that does not match the +/// consensus-decided duty; it must be rejected, never exchanged or aggregated. +#[test] +#[ignore = "blocked on `pluto run`: no consensus/DutyDB cross-check on submitted partials"] +fn prototype_test_compromised_vc_submission_is_rejected() { + unimplemented!("specification only — needs the assembled duty runtime + DutyDB cross-check"); +} + +/// Network partition 2/2: with no majority, neither side may sign or broadcast +/// (safety over liveness). +#[test] +#[ignore = "blocked on `pluto run`: no runtime to partition"] +fn prototype_test_network_partition_2_2_signs_nothing() { + unimplemented!("specification only — needs the assembled duty runtime"); +} + +/// Network partition 3/1: the majority side continues; the isolated minority +/// cannot reach threshold and must not sign. +#[test] +#[ignore = "blocked on `pluto run`: no runtime to partition"] +fn prototype_test_network_partition_3_1_majority_continues_minority_safe() { + unimplemented!("specification only — needs the assembled duty runtime"); +} + +/// A surround/double vote attempted across a restart must be blocked by +/// persistent slashing history. +#[test] +#[ignore = "blocked on `pluto run` AND a slashing-protection DB (neither exists)"] +fn prototype_test_surround_vote_blocked_across_restart() { + unimplemented!("specification only — needs the duty runtime and a persistent slashing DB"); +} + +/// A second runtime started on the same validator keys must not be able to +/// start/sign (`privkeylock` enforcement at the run level). +#[test] +#[ignore = "blocked on `pluto run`: privkeylock primitive exists, run-level enforcement does not"] +fn prototype_test_privkeylock_blocks_second_runtime() { + unimplemented!("specification only — needs `pluto run` to enforce privkeylock at startup"); +} diff --git a/crates/p2p/tests/relay_reservation.rs b/crates/p2p/tests/relay_reservation.rs new file mode 100644 index 00000000..425d51b7 --- /dev/null +++ b/crates/p2p/tests/relay_reservation.rs @@ -0,0 +1,196 @@ +//! End-to-end test for relay reservations over a real libp2p network. +//! +//! Starts a Pluto relay server (`Node`) on loopback TCP and a +//! handful of relay clients (`Node` with relay-client support). Each client +//! dials the relay and listens on its `/p2p-circuit` address, which requests a +//! reservation. The test asserts the relay accepts a reservation from every +//! client. This goes beyond the relay-server HTTP `/enr` test: it exercises the +//! actual libp2p circuit-reservation protocol end to end. + +use std::{collections::HashSet, time::Duration}; + +use anyhow::{Context as _, Result, ensure}; +use k256::{SecretKey, elliptic_curve::rand_core::OsRng}; +use libp2p::{ + Multiaddr, PeerId, + futures::StreamExt, + multiaddr::Protocol, + relay, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + config::P2PConfig, + p2p::{Node, NodeType}, + p2p_context::P2PContext, +}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::timeout, +}; + +/// Number of relay clients that reserve a circuit. +const CLIENTS: usize = 3; +/// Overall test deadline for any single await. +const TEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// Minimal client behaviour: just the relay client needed to reserve a circuit. +#[derive(NetworkBehaviour)] +struct RelayClientBehaviour { + relay: relay::client::Behaviour, +} + +/// Drives the relay server: reports its first listen address and the peer id of +/// every accepted reservation. +fn spawn_relay( + mut relay_node: Node, + listen_tx: oneshot::Sender, + reservation_tx: mpsc::UnboundedSender, + mut stop_rx: oneshot::Receiver<()>, +) -> JoinHandle<()> { + spawn(async move { + let mut listen_tx = Some(listen_tx); + loop { + tokio::select! { + event = relay_node.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + if let Some(tx) = listen_tx.take() { + tx.send(address).ok(); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + relay::Event::ReservationReqAccepted { src_peer_id, .. }, + )) => { + reservation_tx.send(src_peer_id).ok(); + } + _ => {} + }, + _ = &mut stop_rx => break, + } + } + }) +} + +/// Drives one relay client: dials the relay, then reserves a circuit once the +/// relay connection is established. +fn spawn_client( + mut client: Node, + relay_addr: Multiaddr, + circuit_addr: Multiaddr, + mut stop_rx: oneshot::Receiver<()>, +) -> JoinHandle<()> { + spawn(async move { + client.dial(relay_addr).ok(); + let mut reserved = false; + loop { + tokio::select! { + event = client.select_next_some() => { + if let SwarmEvent::ConnectionEstablished { .. } = event + && !reserved + { + reserved = true; + client.listen_on(circuit_addr.clone()).ok(); + } + } + _ = &mut stop_rx => break, + } + } + }) +} + +/// Builds the relay server node listening on a loopback TCP port. +fn build_relay() -> Result> { + let key = SecretKey::random(&mut OsRng); + let cfg = P2PConfig::builder() + .with_tcp_addrs(vec!["127.0.0.1:0".to_string()]) + .build(); + let node = Node::::new_server( + cfg, + key, + NodeType::TCP, + false, + P2PContext::default(), + None, + |builder, keypair| { + builder.with_inner(relay::Behaviour::new( + keypair.public().to_peer_id(), + relay::Config::default(), + )) + }, + )?; + Ok(node) +} + +/// Builds a relay client node. +fn build_client() -> Result> { + let key = SecretKey::random(&mut OsRng); + let node = Node::::new( + P2PConfig::default(), + key, + NodeType::TCP, + false, + P2PContext::default(), + |builder, _keypair, relay_client| { + builder.with_inner(RelayClientBehaviour { + relay: relay_client, + }) + }, + )?; + Ok(node) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn relay_accepts_client_reservations() -> Result<()> { + let relay_node = build_relay()?; + let relay_peer_id = *relay_node.local_peer_id(); + + let (listen_tx, listen_rx) = oneshot::channel::(); + let (reservation_tx, mut reservation_rx) = mpsc::unbounded_channel::(); + let (relay_stop_tx, relay_stop_rx) = oneshot::channel::<()>(); + let relay_task = spawn_relay(relay_node, listen_tx, reservation_tx, relay_stop_rx); + + let relay_listen_addr = timeout(TEST_TIMEOUT, listen_rx) + .await + .context("timed out waiting for relay listen address")? + .context("relay listen channel closed")?; + let relay_addr = relay_listen_addr.with(Protocol::P2p(relay_peer_id)); + let circuit_addr = relay_addr.clone().with(Protocol::P2pCircuit); + + let mut client_ids = HashSet::::new(); + let mut client_tasks = Vec::with_capacity(CLIENTS); + let mut client_stops = Vec::with_capacity(CLIENTS); + for _ in 0..CLIENTS { + let client = build_client()?; + client_ids.insert(*client.local_peer_id()); + let (stop_tx, stop_rx) = oneshot::channel::<()>(); + let task = spawn_client(client, relay_addr.clone(), circuit_addr.clone(), stop_rx); + client_tasks.push(task); + client_stops.push(stop_tx); + } + + // Every client must obtain an accepted reservation on the relay. + let mut reserved = HashSet::::new(); + while !client_ids.is_subset(&reserved) { + let src = timeout(TEST_TIMEOUT, reservation_rx.recv()) + .await + .context("timed out waiting for relay reservations")? + .context("reservation channel closed prematurely")?; + reserved.insert(src); + } + ensure!( + client_ids.is_subset(&reserved), + "not all clients reserved: clients={client_ids:?}, reserved={reserved:?}", + ); + + for stop in client_stops { + stop.send(()).ok(); + } + for task in client_tasks { + task.await.context("client task panicked")?; + } + relay_stop_tx.send(()).ok(); + relay_task.await.context("relay task panicked")?; + Ok(()) +} diff --git a/crates/p2p/tests/relay_routing.rs b/crates/p2p/tests/relay_routing.rs new file mode 100644 index 00000000..4d7afae8 --- /dev/null +++ b/crates/p2p/tests/relay_routing.rs @@ -0,0 +1,275 @@ +//! End-to-end test that two isolated nodes connect through a Pluto relay. +//! +//! A Pluto relay server runs on loopback TCP. Node A reserves a circuit on the +//! relay; node B is told only A's `/p2p-circuit` address (it never learns A's +//! direct address), so the only way it can reach A is via the relay. The test +//! asserts the relay routes the circuit (`CircuitReqAccepted` for B → A) and +//! that B actually establishes a connection to A over it — i.e. the noise +//! handshake bytes flowed through the relay. This proves the relay carries real +//! peer-to-peer traffic, not just that its HTTP endpoint answers. + +use std::time::Duration; + +use anyhow::{Context as _, Result}; +use k256::{SecretKey, elliptic_curve::rand_core::OsRng}; +use libp2p::{ + Multiaddr, PeerId, + futures::StreamExt, + multiaddr::Protocol, + relay, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + config::P2PConfig, + p2p::{Node, NodeType}, + p2p_context::P2PContext, +}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::timeout, +}; + +/// Overall test deadline for any single await. +const TEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// Minimal client behaviour: just the relay client needed for circuit routing. +#[derive(NetworkBehaviour)] +struct RelayClientBehaviour { + relay: relay::client::Behaviour, +} + +/// Relay-side events the test cares about. +enum RelayEvent { + /// A peer's reservation was accepted. + Reservation(PeerId), + /// A circuit was accepted, routing `src` to `dst`. + Circuit { src: PeerId, dst: PeerId }, +} + +/// Drives the relay server: reports its first listen address, accepted +/// reservations, and accepted circuits. +fn spawn_relay( + mut relay_node: Node, + listen_tx: oneshot::Sender, + event_tx: mpsc::UnboundedSender, + mut stop_rx: oneshot::Receiver<()>, +) -> JoinHandle<()> { + spawn(async move { + let mut listen_tx = Some(listen_tx); + loop { + tokio::select! { + event = relay_node.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + if let Some(tx) = listen_tx.take() { + tx.send(address).ok(); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + relay::Event::ReservationReqAccepted { src_peer_id, .. }, + )) => { + event_tx.send(RelayEvent::Reservation(src_peer_id)).ok(); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + relay::Event::CircuitReqAccepted { src_peer_id, dst_peer_id }, + )) => { + event_tx + .send(RelayEvent::Circuit { + src: src_peer_id, + dst: dst_peer_id, + }) + .ok(); + } + _ => {} + }, + _ = &mut stop_rx => break, + } + } + }) +} + +/// Drives the destination node: dials the relay, then reserves a circuit once +/// the relay connection is established. +fn spawn_reserving_node( + mut node: Node, + relay_addr: Multiaddr, + circuit_addr: Multiaddr, + mut stop_rx: oneshot::Receiver<()>, +) -> JoinHandle<()> { + spawn(async move { + node.dial(relay_addr).ok(); + let mut reserved = false; + loop { + tokio::select! { + event = node.select_next_some() => { + if let SwarmEvent::ConnectionEstablished { .. } = event + && !reserved + { + reserved = true; + node.listen_on(circuit_addr.clone()).ok(); + } + } + _ = &mut stop_rx => break, + } + } + }) +} + +/// Drives the source node: dials the destination's circuit address and reports +/// the peer id of every connection it establishes. +fn spawn_dialing_node( + mut node: Node, + dst_addr: Multiaddr, + established_tx: mpsc::UnboundedSender, + mut stop_rx: oneshot::Receiver<()>, +) -> JoinHandle<()> { + spawn(async move { + node.dial(dst_addr).ok(); + loop { + tokio::select! { + event = node.select_next_some() => { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = event { + established_tx.send(peer_id).ok(); + } + } + _ = &mut stop_rx => break, + } + } + }) +} + +/// Builds the relay server node listening on a loopback TCP port. +fn build_relay() -> Result> { + let key = SecretKey::random(&mut OsRng); + let cfg = P2PConfig::builder() + .with_tcp_addrs(vec!["127.0.0.1:0".to_string()]) + .build(); + let node = Node::::new_server( + cfg, + key, + NodeType::TCP, + false, + P2PContext::default(), + None, + |builder, keypair| { + builder.with_inner(relay::Behaviour::new( + keypair.public().to_peer_id(), + relay::Config::default(), + )) + }, + )?; + Ok(node) +} + +/// Builds a relay client node. +fn build_client() -> Result> { + let key = SecretKey::random(&mut OsRng); + let node = Node::::new( + P2PConfig::default(), + key, + NodeType::TCP, + false, + P2PContext::default(), + |builder, _keypair, relay_client| { + builder.with_inner(RelayClientBehaviour { + relay: relay_client, + }) + }, + )?; + Ok(node) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn isolated_nodes_connect_through_relay() -> Result<()> { + let relay_node = build_relay()?; + let relay_peer_id = *relay_node.local_peer_id(); + + let (listen_tx, listen_rx) = oneshot::channel::(); + let (relay_event_tx, mut relay_event_rx) = mpsc::unbounded_channel::(); + let (relay_stop_tx, relay_stop_rx) = oneshot::channel::<()>(); + let relay_task = spawn_relay(relay_node, listen_tx, relay_event_tx, relay_stop_rx); + + let relay_listen_addr = timeout(TEST_TIMEOUT, listen_rx) + .await + .context("timed out waiting for relay listen address")? + .context("relay listen channel closed")?; + let relay_addr = relay_listen_addr.with(Protocol::P2p(relay_peer_id)); + let circuit_listen_addr = relay_addr.clone().with(Protocol::P2pCircuit); + + // Destination node A reserves a circuit on the relay. + let dst_node = build_client()?; + let dst_peer_id = *dst_node.local_peer_id(); + let (dst_stop_tx, dst_stop_rx) = oneshot::channel::<()>(); + let dst_task = spawn_reserving_node( + dst_node, + relay_addr.clone(), + circuit_listen_addr, + dst_stop_rx, + ); + + // Wait until A's reservation is in place before B tries to reach it. + timeout(TEST_TIMEOUT, async { + while let Some(event) = relay_event_rx.recv().await { + if let RelayEvent::Reservation(peer) = event + && peer == dst_peer_id + { + return Ok(()); + } + } + anyhow::bail!("relay event channel closed before reservation"); + }) + .await + .context("timed out waiting for destination reservation")??; + + // Source node B is told ONLY A's circuit address, so it must use the relay. + let dst_circuit_addr = relay_addr + .clone() + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(dst_peer_id)); + let src_node = build_client()?; + let src_peer_id = *src_node.local_peer_id(); + let (established_tx, mut established_rx) = mpsc::unbounded_channel::(); + let (src_stop_tx, src_stop_rx) = oneshot::channel::<()>(); + let src_task = spawn_dialing_node(src_node, dst_circuit_addr, established_tx, src_stop_rx); + + // Expect the relay to route B → A, and B to establish the relayed connection. + timeout(TEST_TIMEOUT, async { + let mut circuit_routed = false; + let mut connected_to_dst = false; + while !(circuit_routed && connected_to_dst) { + tokio::select! { + Some(event) = relay_event_rx.recv() => { + if let RelayEvent::Circuit { src, dst } = event + && src == src_peer_id + && dst == dst_peer_id + { + circuit_routed = true; + } + } + Some(peer) = established_rx.recv() => { + if peer == dst_peer_id { + connected_to_dst = true; + } + } + } + } + Ok::<(), anyhow::Error>(()) + }) + .await + .context("timed out waiting for relayed connection")??; + + // Reaching here means both conditions held: the relay accepted the B → A + // circuit and B established the relayed connection to A. + + // Shutdown. + src_stop_tx.send(()).ok(); + src_task.await.context("source task panicked")?; + dst_stop_tx.send(()).ok(); + dst_task.await.context("destination task panicked")?; + relay_stop_tx.send(()).ok(); + relay_task.await.context("relay task panicked")?; + + Ok(()) +} diff --git a/crates/parsigex/Cargo.toml b/crates/parsigex/Cargo.toml index acd88114..1361a050 100644 --- a/crates/parsigex/Cargo.toml +++ b/crates/parsigex/Cargo.toml @@ -22,9 +22,12 @@ anyhow.workspace = true clap.workspace = true hex.workspace = true pluto-cluster.workspace = true +pluto-crypto.workspace = true +pluto-testutil.workspace = true pluto-tracing.workspace = true tokio-util.workspace = true serde_json.workspace = true +k256.workspace = true [lints] workspace = true diff --git a/crates/parsigex/tests/parsigex_invalid_partial.rs b/crates/parsigex/tests/parsigex_invalid_partial.rs new file mode 100644 index 00000000..9a95a320 --- /dev/null +++ b/crates/parsigex/tests/parsigex_invalid_partial.rs @@ -0,0 +1,367 @@ +//! End-to-end test that ParSigEx rejects a cryptographically invalid partial. +//! +//! Two nodes connect over real (loopback TCP) libp2p. A real threshold-BLS key +//! is dealt; the receiver runs a genuine verifier that checks each received +//! partial signature against the signer's public share. The sender broadcasts +//! one valid partial (correctly signed over the agreed message) and one invalid +//! partial (a well-formed BLS signature over a *different* message). The test +//! asserts the receiver surfaces the valid one as `Received` and rejects the +//! invalid one as `Error` (`InvalidPartialSignature`) — it is never delivered. +//! +//! This exercises the real ParSigEx verify-and-reject path (`do_recv` → +//! `Failure::InvalidPartialSignature`), not a no-op verifier, so it proves +//! Byzantine protection against forged partials at the exchange layer. + +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use anyhow::{Context as _, Result, ensure}; +use futures::StreamExt as _; +use libp2p::{Multiaddr, PeerId, swarm::SwarmEvent}; +use pluto_core::{ + signeddata::SignedRandao, + types::{Duty, DutyType, ParSignedData, ParSignedDataSet, PubKey, SlotNumber}, +}; +use pluto_crypto::{ + blst_impl::BlstImpl, + tbls::Tbls, + types::{PrivateKey, PublicKey}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + config::P2PConfig, + p2p::{Node, NodeType}, + p2p_context::P2PContext, + peer::peer_id_from_key, +}; +use pluto_parsigex::{self as parsigex, DutyGater, Event, Failure, Handle, Verifier, VerifyError}; +use pluto_testutil::random::{generate_insecure_k1_key, generate_test_bls_key}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::timeout, +}; + +const NODES: usize = 2; +const THRESHOLD: usize = 2; +const RECEIVER: usize = 0; +const SENDER: usize = 1; +/// The sender is node index 1, so it holds the 1-based share index 2. +const SENDER_SHARE: u64 = 2; +const EPOCH: u64 = 1; +const SLOT: u64 = 32; +/// The message the cluster agrees to sign for this duty. +const MSG: &[u8] = b"pluto parsigex agreed signing root"; +/// A different message; a partial over this must be rejected for the duty. +const WRONG_MSG: &[u8] = b"pluto parsigex tampered signing root"; +const TEST_TIMEOUT: Duration = Duration::from_secs(30); + +/// Threshold key material dealt for the test cluster. +struct ClusterKey { + group_pub_core: PubKey, + shares: HashMap, + public_shares: HashMap, +} + +impl ClusterKey { + fn deal() -> Result { + let secret = generate_test_bls_key(42); + let group_pub = BlstImpl + .secret_to_public_key(&secret) + .context("failed to derive group public key")?; + let total = u64::try_from(NODES).context("node count fits u64")?; + let threshold = u64::try_from(THRESHOLD).context("threshold fits u64")?; + let shares = BlstImpl + .threshold_split(&secret, total, threshold) + .context("failed to split group secret into shares")?; + + let mut public_shares = HashMap::with_capacity(shares.len()); + for (share_idx, share_priv) in &shares { + let share_pub = BlstImpl + .secret_to_public_key(share_priv) + .context("failed to derive public share")?; + public_shares.insert(*share_idx, share_pub); + } + + Ok(Self { + group_pub_core: PubKey::new(group_pub), + shares, + public_shares, + }) + } +} + +/// An event observed on a node's swarm loop. +enum Observed { + Received { node: usize, share_idx: u64 }, + Rejected { node: usize, error: Failure }, +} + +/// A spawned node: its swarm runs on a task; control happens over channels. +struct RunningNode { + handle: Handle, + dial_tx: mpsc::UnboundedSender>, + stop_tx: oneshot::Sender<()>, + join: JoinHandle>, +} + +/// Sinks the swarm loop forwards events into. +#[derive(Clone)] +struct EventSinks { + listen_tx: mpsc::UnboundedSender<(usize, Multiaddr)>, + conn_tx: mpsc::UnboundedSender<(usize, PeerId)>, + observed_tx: mpsc::UnboundedSender, +} + +/// A verifier that always accepts (for the sender, which never receives here). +fn accept_all_verifier() -> Verifier { + Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })) +} + +/// A verifier that checks each partial against its public share over [`MSG`]. +fn share_verifier(public_shares: HashMap) -> Verifier { + let public_shares = Arc::new(public_shares); + Arc::new(move |_duty, _pubkey, data: ParSignedData| { + let public_shares = public_shares.clone(); + Box::pin(async move { + let signature = data + .signed_data + .signature() + .map_err(|e| VerifyError::Other(e.to_string()))?; + let public_share = public_shares + .get(&data.share_idx) + .ok_or(VerifyError::InvalidShareIndex)?; + BlstImpl + .verify(public_share, MSG, &signature) + .map_err(|e| VerifyError::Other(e.to_string()))?; + Ok(()) + }) + }) +} + +/// Builds the parsigex node at `index` with the given verifier. +fn build_node( + index: usize, + key: k256::SecretKey, + peer_ids: &[PeerId], + verifier: Verifier, +) -> Result<(Node, Handle)> { + let peer_id = peer_ids[index]; + let p2p_context = P2PContext::new(peer_ids.to_vec()); + let duty_gater: DutyGater = Arc::new(|duty: &Duty| duty.duty_type != DutyType::Unknown); + let config = parsigex::Config::new(peer_id, p2p_context.clone(), verifier, duty_gater) + .with_timeout(Duration::from_secs(10)); + let (behaviour, handle) = parsigex::Behaviour::new(config); + + let node = Node::new_server( + P2PConfig::default(), + key, + NodeType::TCP, + false, + p2p_context, + None, + move |builder, _keypair| builder.with_inner(behaviour), + ) + .context("failed to build node")?; + + Ok((node, handle)) +} + +/// Drives one node's swarm until stopped, forwarding events into `sinks`. +async fn run_swarm( + mut node: Node, + index: usize, + sinks: EventSinks, + mut dial_rx: mpsc::UnboundedReceiver>, + mut stop_rx: oneshot::Receiver<()>, +) -> Result<()> { + node.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?; + loop { + tokio::select! { + _ = &mut stop_rx => break, + Some(targets) = dial_rx.recv() => { + for target in targets { + node.dial(target)?; + } + } + event = node.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + sinks.listen_tx.send((index, address)).ok(); + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + sinks.conn_tx.send((index, peer_id)).ok(); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(Event::Received { + data_set, + .. + })) => { + for data in data_set.inner().values() { + sinks + .observed_tx + .send(Observed::Received { + node: index, + share_idx: data.share_idx, + }) + .ok(); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner(Event::Error { error, .. })) => { + sinks + .observed_tx + .send(Observed::Rejected { node: index, error }) + .ok(); + } + _ => {} + }, + } + } + Ok(()) +} + +/// Builds a partial signature set: `share` signs `message`, tagged `share_idx`. +fn partial_set( + cluster: &ClusterKey, + share: &PrivateKey, + share_idx: u64, + message: &[u8], +) -> Result { + let signature = BlstImpl + .sign(share, message) + .context("failed to sign with share")?; + let partial = SignedRandao::new_partial(EPOCH, signature, share_idx); + let mut data_set = ParSignedDataSet::new(); + data_set.insert(cluster.group_pub_core, partial); + Ok(data_set) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn parsigex_rejects_invalid_partial_signature() -> Result<()> { + let cluster = ClusterKey::deal()?; + let sender_share = *cluster + .shares + .get(&SENDER_SHARE) + .context("missing sender share")?; + + let keys = (0..NODES) + .map(|index| generate_insecure_k1_key(u8::try_from(index).expect("node index fits u8"))) + .collect::>(); + let peer_ids = keys + .iter() + .map(|key| peer_id_from_key(key.public_key())) + .collect::, _>>() + .context("failed to derive peer IDs")?; + + let (listen_tx, mut listen_rx) = mpsc::unbounded_channel::<(usize, Multiaddr)>(); + let (conn_tx, mut conn_rx) = mpsc::unbounded_channel::<(usize, PeerId)>(); + let (observed_tx, mut observed_rx) = mpsc::unbounded_channel::(); + let sinks = EventSinks { + listen_tx, + conn_tx, + observed_tx, + }; + + let verifiers = [ + share_verifier(cluster.public_shares.clone()), + accept_all_verifier(), + ]; + let mut running = Vec::with_capacity(NODES); + for (index, (key, verifier)) in keys.into_iter().zip(verifiers).enumerate() { + let (node, handle) = build_node(index, key, &peer_ids, verifier)?; + let (dial_tx, dial_rx) = mpsc::unbounded_channel::>(); + let (stop_tx, stop_rx) = oneshot::channel(); + let join = spawn(run_swarm(node, index, sinks.clone(), dial_rx, stop_rx)); + running.push(RunningNode { + handle, + dial_tx, + stop_tx, + join, + }); + } + + // Collect both listen addresses, then connect the two nodes. + let mut addrs = [None::, None::]; + while addrs.iter().any(Option::is_none) { + let (index, address) = timeout(TEST_TIMEOUT, listen_rx.recv()) + .await + .context("timed out waiting for listen addresses")? + .context("listen channel closed")?; + if addrs[index].is_none() { + addrs[index] = Some(address); + } + } + let sender_addr = addrs[SENDER] + .clone() + .context("missing sender listen addr")?; + running[RECEIVER] + .dial_tx + .send(vec![sender_addr]) + .context("failed to queue dial")?; + + let mut connected = [false, false]; + while !connected.iter().all(|c| *c) { + let (index, _peer) = timeout(TEST_TIMEOUT, conn_rx.recv()) + .await + .context("timed out waiting for connections")? + .context("connection channel closed")?; + connected[index] = true; + } + + let duty = Duty::new(SlotNumber::new(SLOT), DutyType::Randao); + + // 1. A valid partial must be received. + let valid = partial_set(&cluster, &sender_share, SENDER_SHARE, MSG)?; + running[SENDER] + .handle + .broadcast_and_wait(duty.clone(), valid) + .await + .context("failed to broadcast valid partial")?; + let first = timeout(TEST_TIMEOUT, observed_rx.recv()) + .await + .context("timed out waiting for the valid partial")? + .context("observed channel closed")?; + match first { + Observed::Received { node, share_idx } => { + ensure!(node == RECEIVER, "valid partial observed on node {node}"); + ensure!( + share_idx == SENDER_SHARE, + "valid partial had share index {share_idx}, want {SENDER_SHARE}", + ); + } + Observed::Rejected { error, .. } => { + anyhow::bail!("valid partial was rejected: {error}"); + } + } + + // 2. An invalid partial (signed over a different message) must be rejected. + let invalid = partial_set(&cluster, &sender_share, SENDER_SHARE, WRONG_MSG)?; + running[SENDER] + .handle + .broadcast_and_wait(duty.clone(), invalid) + .await + .context("failed to broadcast invalid partial")?; + let second = timeout(TEST_TIMEOUT, observed_rx.recv()) + .await + .context("timed out waiting for the invalid partial outcome")? + .context("observed channel closed")?; + match second { + Observed::Rejected { node, error } => { + ensure!(node == RECEIVER, "rejection observed on node {node}"); + ensure!( + matches!(error, Failure::InvalidPartialSignature(_)), + "invalid partial rejected for the wrong reason: {error}", + ); + } + Observed::Received { share_idx, .. } => { + anyhow::bail!("invalid partial (share {share_idx}) was accepted, not rejected"); + } + } + + for node in running { + node.stop_tx + .send(()) + .ok() + .context("failed to signal node to stop")?; + node.join.await.context("swarm task panicked")??; + } + Ok(()) +} diff --git a/crates/peerinfo/tests/peerinfo_exchange.rs b/crates/peerinfo/tests/peerinfo_exchange.rs new file mode 100644 index 00000000..8d50147c --- /dev/null +++ b/crates/peerinfo/tests/peerinfo_exchange.rs @@ -0,0 +1,259 @@ +//! End-to-end test for the peerinfo protocol over a real libp2p network. +//! +//! Spawns a small cluster of swarms that each run only the peerinfo +//! [`Behaviour`], connects them in a full mesh over loopback TCP, and asserts +//! that every node receives the peer info (version, lock hash, nickname) of +//! every other node across a live connection. This goes beyond the in-crate +//! protobuf round-trip unit tests: it proves the information actually travels a +//! negotiated libp2p stream and surfaces as a behaviour event. + +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; + +use anyhow::{Context as _, Result, ensure}; +use libp2p::{ + Multiaddr, PeerId, Swarm, SwarmBuilder, futures::StreamExt, identity::Keypair, noise, + swarm::SwarmEvent, tcp, yamux, +}; +use pluto_peerinfo::{Behaviour, Config, Event, LocalPeerInfo}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::timeout, +}; + +/// Number of nodes in the test cluster. +const NODES: usize = 4; +/// Peer info exchange interval; short so the first exchange happens promptly. +const INTERVAL: Duration = Duration::from_millis(50); +/// Per-connection request timeout. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +/// Keep idle connections alive long enough for the periodic exchange to run. +const IDLE_TIMEOUT: Duration = Duration::from_secs(30); +/// Overall test deadline for any single await. +const TEST_TIMEOUT: Duration = Duration::from_secs(20); +/// Cluster-wide version string shared by every node. +const CLUSTER_VERSION: &str = "v1.2.3"; +/// Cluster-wide git hash shared by every node. +const CLUSTER_GIT_HASH: &str = "abc1234"; +/// Cluster-wide lock hash shared by every node. +const CLUSTER_LOCK_HASH: [u8; 32] = [0xab; 32]; + +/// Nickname advertised by node `idx`; the only per-node-distinct field, used to +/// confirm that received info came from the expected sender. +fn nickname_of(idx: usize) -> String { + format!("node-{idx}") +} + +/// Peer info received by one node from one peer, flattened to the fields under +/// test so the test does not depend on the generated protobuf type. +struct ReceivedInfo { + receiver: usize, + sender: PeerId, + nickname: String, + version: String, + lock_hash: Vec, +} + +/// Handle to a spawned node: the dial queue, the stop signal, and the task. +struct NodeHandle { + dial_tx: mpsc::UnboundedSender, + stop_tx: oneshot::Sender<()>, + task: JoinHandle<()>, +} + +/// Builds a swarm running only the peerinfo behaviour and starts listening on a +/// loopback TCP port. +fn build_swarm(keypair: Keypair, idx: usize) -> Result> { + let local_info = LocalPeerInfo::new( + CLUSTER_VERSION, + CLUSTER_LOCK_HASH.to_vec(), + CLUSTER_GIT_HASH, + false, + nickname_of(idx), + ); + let config = Config::new(local_info) + .with_interval(INTERVAL) + .with_timeout(REQUEST_TIMEOUT); + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + ) + .context("failed to build tcp transport")? + .with_behaviour(|key| Behaviour::new(key.public().to_peer_id(), config)) + .context("failed to build peerinfo behaviour")? + .with_swarm_config(|c| c.with_idle_connection_timeout(IDLE_TIMEOUT)) + .build(); + let listen_addr = "/ip4/127.0.0.1/tcp/0" + .parse::() + .context("failed to parse listen multiaddr")?; + swarm + .listen_on(listen_addr) + .context("failed to start listening")?; + Ok(swarm) +} + +/// Spawns the swarm event loop for one node. +fn spawn_node( + mut swarm: Swarm, + idx: usize, + listen_tx: mpsc::UnboundedSender<(usize, Multiaddr)>, + received_tx: mpsc::UnboundedSender, +) -> NodeHandle { + let (dial_tx, mut dial_rx) = mpsc::unbounded_channel::(); + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let task = spawn(async move { + loop { + tokio::select! { + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + listen_tx.send((idx, address)).ok(); + } + SwarmEvent::Behaviour(Event::Received { peer, info, .. }) => { + let received = ReceivedInfo { + receiver: idx, + sender: peer, + nickname: info.nickname, + version: info.pluto_version, + lock_hash: info.lock_hash.to_vec(), + }; + received_tx.send(received).ok(); + } + _ => {} + }, + addr = dial_rx.recv() => { + if let Some(addr) = addr { + swarm.dial(addr).ok(); + } + } + _ = &mut stop_rx => break, + } + } + }); + NodeHandle { + dial_tx, + stop_tx, + task, + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn connected_nodes_exchange_peer_info() -> Result<()> { + let keypairs = (0..NODES) + .map(|_| Keypair::generate_ed25519()) + .collect::>(); + let peer_ids = keypairs + .iter() + .map(|k| k.public().to_peer_id()) + .collect::>(); + let index_of = peer_ids + .iter() + .enumerate() + .map(|(idx, peer)| (*peer, idx)) + .collect::>(); + + let (listen_tx, mut listen_rx) = mpsc::unbounded_channel::<(usize, Multiaddr)>(); + let (received_tx, mut received_rx) = mpsc::unbounded_channel::(); + + let mut handles = Vec::with_capacity(NODES); + for (idx, keypair) in keypairs.into_iter().enumerate() { + let swarm = build_swarm(keypair, idx)?; + let handle = spawn_node(swarm, idx, listen_tx.clone(), received_tx.clone()); + handles.push(handle); + } + drop(listen_tx); + drop(received_tx); + + // Collect one listen address per node before dialing. + let mut addrs = vec![None::; NODES]; + let mut listening = 0usize; + while listening < NODES { + let next = timeout(TEST_TIMEOUT, listen_rx.recv()) + .await + .context("timed out waiting for listen addresses")? + .context("listen channel closed before all nodes listened")?; + let (idx, address) = next; + if addrs[idx].is_none() { + addrs[idx] = Some(address); + listening += 1; + } + } + let addrs = addrs + .into_iter() + .enumerate() + .map(|(idx, addr)| addr.with_context(|| format!("missing listen addr for node {idx}"))) + .collect::>>()?; + + // Connect the cluster in a full mesh; one connection per pair is enough, as + // both ends run the periodic exchange over it. + for (i, handle) in handles.iter().enumerate() { + for addr in addrs.iter().skip(i + 1) { + handle + .dial_tx + .send(addr.clone()) + .context("failed to queue dial")?; + } + } + + // Collect exchanges until every ordered (receiver, sender) pair is covered. + let expected_pairs = NODES * (NODES - 1); + let mut latest = HashMap::<(usize, usize), ReceivedInfo>::new(); + let mut seen = HashSet::<(usize, usize)>::new(); + while seen.len() < expected_pairs { + let received = timeout(TEST_TIMEOUT, received_rx.recv()) + .await + .context("timed out waiting for peer info exchange")? + .context("received channel closed prematurely")?; + let sender = *index_of + .get(&received.sender) + .context("received info from an unknown peer")?; + let key = (received.receiver, sender); + seen.insert(key); + latest.insert(key, received); + } + + // Every node must have received the correct info from every other node. + for receiver in 0..NODES { + for sender in 0..NODES { + if receiver == sender { + continue; + } + let info = latest + .get(&(receiver, sender)) + .with_context(|| format!("node {receiver} never received info from {sender}"))?; + let want_nickname = nickname_of(sender); + ensure!( + info.nickname == want_nickname, + "node {receiver} got nickname {:?} from node {sender}, want {want_nickname:?}", + info.nickname, + ); + ensure!( + info.version == CLUSTER_VERSION, + "node {receiver} got version {:?} from node {sender}, want {CLUSTER_VERSION:?}", + info.version, + ); + ensure!( + info.lock_hash == CLUSTER_LOCK_HASH, + "node {receiver} got unexpected lock hash from node {sender}", + ); + } + } + + for handle in handles { + let NodeHandle { + dial_tx, + stop_tx, + task, + } = handle; + drop(dial_tx); + stop_tx.send(()).ok(); + task.await.context("node task panicked")?; + } + Ok(()) +} diff --git a/crates/peerinfo/tests/peerinfo_version_mismatch.rs b/crates/peerinfo/tests/peerinfo_version_mismatch.rs new file mode 100644 index 00000000..5069023b --- /dev/null +++ b/crates/peerinfo/tests/peerinfo_version_mismatch.rs @@ -0,0 +1,212 @@ +//! End-to-end test for peerinfo version-compatibility handling over libp2p. +//! +//! Two nodes connect over loopback TCP: one advertises a supported version, the +//! other an unsupported one. The test asserts the implemented +//! (Charon-equivalent) behaviour: peerinfo is informational, so a version +//! mismatch does **not** tear down the connection or abort the exchange — +//! instead the offending peer is flagged via the `version_support` gauge (0 = +//! incompatible, 1 = compatible), and the exchange still completes in both +//! directions without panicking. + +use std::{collections::HashSet, time::Duration}; + +use anyhow::{Context as _, Result, ensure}; +use libp2p::{ + Multiaddr, PeerId, Swarm, SwarmBuilder, futures::StreamExt, identity::Keypair, noise, + swarm::SwarmEvent, tcp, yamux, +}; +use pluto_p2p::name::peer_name; +use pluto_peerinfo::{Behaviour, Config, Event, LocalPeerInfo, metrics::PEERINFO_METRICS}; +use tokio::{ + spawn, + sync::{mpsc, oneshot}, + task::JoinHandle, + time::timeout, +}; + +/// A version whose minor matches the supported set (compatible). +const COMPATIBLE_VERSION: &str = "v1.5.0"; +/// A version older than the supported set with no matching minor +/// (incompatible). +const INCOMPATIBLE_VERSION: &str = "v0.9.0"; +/// Git hash matching the protocol's `^[0-9a-f]{7}$` validation regex. +const GIT_HASH: &str = "abc1234"; +/// Cluster-wide lock hash shared by both nodes. +const LOCK_HASH: [u8; 32] = [0xab; 32]; +/// Gauge value meaning "peer version supported". +const SUPPORTED: i64 = 1; +/// Gauge value meaning "peer version unsupported". +const UNSUPPORTED: i64 = 0; + +/// Peer info exchange interval; short so the first exchange happens promptly. +const INTERVAL: Duration = Duration::from_millis(50); +/// Per-connection request timeout. +const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +/// Keep idle connections alive long enough for the periodic exchange to run. +const IDLE_TIMEOUT: Duration = Duration::from_secs(30); +/// Overall test deadline for any single await. +const TEST_TIMEOUT: Duration = Duration::from_secs(20); + +/// Handle to a spawned node. +struct NodeHandle { + dial_tx: mpsc::UnboundedSender, + stop_tx: oneshot::Sender<()>, + task: JoinHandle<()>, +} + +/// Builds a swarm running only the peerinfo behaviour, advertising `version`, +/// and starts listening on a loopback TCP port. +fn build_swarm(keypair: Keypair, version: &str, nickname: &str) -> Result> { + let local_info = LocalPeerInfo::new(version, LOCK_HASH.to_vec(), GIT_HASH, false, nickname); + let config = Config::new(local_info) + .with_interval(INTERVAL) + .with_timeout(REQUEST_TIMEOUT); + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + ) + .context("failed to build tcp transport")? + .with_behaviour(|key| Behaviour::new(key.public().to_peer_id(), config)) + .context("failed to build peerinfo behaviour")? + .with_swarm_config(|c| c.with_idle_connection_timeout(IDLE_TIMEOUT)) + .build(); + let listen_addr = "/ip4/127.0.0.1/tcp/0" + .parse::() + .context("failed to parse listen multiaddr")?; + swarm + .listen_on(listen_addr) + .context("failed to start listening")?; + Ok(swarm) +} + +/// Spawns the swarm event loop, reporting its listen address and the index of +/// every peer it successfully exchanges info with. +fn spawn_node( + mut swarm: Swarm, + idx: usize, + listen_tx: mpsc::UnboundedSender<(usize, Multiaddr)>, + received_tx: mpsc::UnboundedSender<(usize, PeerId)>, +) -> NodeHandle { + let (dial_tx, mut dial_rx) = mpsc::unbounded_channel::(); + let (stop_tx, mut stop_rx) = oneshot::channel::<()>(); + let task = spawn(async move { + loop { + tokio::select! { + event = swarm.select_next_some() => match event { + SwarmEvent::NewListenAddr { address, .. } => { + listen_tx.send((idx, address)).ok(); + } + SwarmEvent::Behaviour(Event::Received { peer, .. }) => { + received_tx.send((idx, peer)).ok(); + } + _ => {} + }, + addr = dial_rx.recv() => { + if let Some(addr) = addr { + swarm.dial(addr).ok(); + } + } + _ = &mut stop_rx => break, + } + } + }); + NodeHandle { + dial_tx, + stop_tx, + task, + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn incompatible_version_peer_is_flagged_without_dropping_exchange() -> Result<()> { + let compatible_key = Keypair::generate_ed25519(); + let incompatible_key = Keypair::generate_ed25519(); + let compatible_id = compatible_key.public().to_peer_id(); + let incompatible_id = incompatible_key.public().to_peer_id(); + + let (listen_tx, mut listen_rx) = mpsc::unbounded_channel::<(usize, Multiaddr)>(); + let (received_tx, mut received_rx) = mpsc::unbounded_channel::<(usize, PeerId)>(); + + // Node 0 advertises a supported version, node 1 an unsupported one. + let compatible_swarm = build_swarm(compatible_key, COMPATIBLE_VERSION, "compatible")?; + let incompatible_swarm = build_swarm(incompatible_key, INCOMPATIBLE_VERSION, "incompatible")?; + let node0 = spawn_node(compatible_swarm, 0, listen_tx.clone(), received_tx.clone()); + let node1 = spawn_node( + incompatible_swarm, + 1, + listen_tx.clone(), + received_tx.clone(), + ); + drop(listen_tx); + drop(received_tx); + + // Collect both listen addresses. + let mut addrs = [None::, None::]; + while addrs.iter().any(Option::is_none) { + let (idx, address) = timeout(TEST_TIMEOUT, listen_rx.recv()) + .await + .context("timed out waiting for listen addresses")? + .context("listen channel closed before both nodes listened")?; + if addrs[idx].is_none() { + addrs[idx] = Some(address); + } + } + let node1_addr = addrs[1].clone().context("missing listen addr for node 1")?; + + // One connection is enough; both ends run the periodic exchange over it. + node0 + .dial_tx + .send(node1_addr) + .context("failed to queue dial")?; + + // Wait until each node has validated the other (both exchange directions). + let mut seen = HashSet::<(usize, usize)>::new(); + while seen.len() < 2 { + let (receiver, sender) = timeout(TEST_TIMEOUT, received_rx.recv()) + .await + .context("timed out waiting for peer info exchange")? + .context("received channel closed prematurely")?; + let sender_idx = if sender == compatible_id { + 0 + } else if sender == incompatible_id { + 1 + } else { + anyhow::bail!("received info from an unknown peer"); + }; + seen.insert((receiver, sender_idx)); + } + + // The exchange completed in both directions despite the mismatch: the + // connection was neither rejected nor torn down (no crash, no panic). + ensure!( + seen.contains(&(0, 1)) && seen.contains(&(1, 0)), + "expected a completed exchange in both directions, saw {seen:?}", + ); + + // The compatibility verdict is recorded on the per-peer gauge. + let incompatible_support = PEERINFO_METRICS.version_support[&peer_name(&incompatible_id)].get(); + let compatible_support = PEERINFO_METRICS.version_support[&peer_name(&compatible_id)].get(); + ensure!( + incompatible_support == UNSUPPORTED, + "unsupported peer should be flagged 0, got {incompatible_support}", + ); + ensure!( + compatible_support == SUPPORTED, + "supported peer should be flagged 1, got {compatible_support}", + ); + + for handle in [node0, node1] { + let NodeHandle { + dial_tx, + stop_tx, + task, + } = handle; + drop(dial_tx); + stop_tx.send(()).ok(); + task.await.context("node task panicked")?; + } + Ok(()) +} diff --git a/crates/peerinfo/tests/prototype_fail_closed.rs b/crates/peerinfo/tests/prototype_fail_closed.rs new file mode 100644 index 00000000..445410dc --- /dev/null +++ b/crates/peerinfo/tests/prototype_fail_closed.rs @@ -0,0 +1,42 @@ +//! PROTOTYPE (not yet runnable): fail-closed handling of incompatible peers. +//! +//! Forward specification, `#[ignore]`d so it never runs in CI. +//! +//! ## Context +//! +//! peerinfo version/lock-hash mismatch is **informational by design** (Charon +//! parity): the node flags the peer via the `version_support` gauge and logs, +//! but does not drop the connection. That implemented behaviour is already +//! covered by the real test +//! `crates/peerinfo/tests/peerinfo_version_mismatch.rs` +//! (`incompatible_version_peer_is_flagged_without_dropping_exchange`). +//! +//! ## What is missing (the gap) +//! +//! The actual fail-closed enforcement — refusing to admit a peer that is not a +//! member of the cluster — is not wired. `PlutoBehaviour` carries a +//! `ConnGater` (`crates/p2p/src/behaviours/pluto.rs`), but "by default an open +//! gater is used that allows all connections"; nothing constructs a gater that +//! denies peers whose ID is absent from the cluster lock, and no policy turns a +//! peerinfo incompatibility verdict into a disconnect. Wiring that policy is a +//! runtime concern (blocked on the assembled node / `pluto run`). +//! +//! ## Target scenario (to assert once enforcement exists) +//! +//! 1. Start a node configured with a cluster-membership gater. +//! 2. Have a peer whose ID is not in the cluster lock attempt to connect. +//! 3. Assert the connection is denied (gater rejects it) and the node keeps +//! running — it never treats the stranger as a cluster peer, without +//! crashing. + +/// Forward spec for refusing a non-cluster / incompatible peer. Ignored and +/// intentionally unimplemented: needs a cluster-membership gater policy wired +/// into the runtime (see module docs). +#[test] +#[ignore = "blocked: no gater policy denies non-cluster peers; peerinfo mismatch is informational only"] +fn prototype_test_incompatible_peer_connection_is_refused() { + unimplemented!( + "specification only — implement once a cluster-membership ConnGater (or a peerinfo \ + verdict-to-disconnect policy) is wired into the runtime (see module docs)" + ); +}