From 6c66c352b39c68913538637fe5f70ce6bf7e4c2e Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:08:37 +0200 Subject: [PATCH 01/11] feat(core): add CachedValidatorsProvider trait for validatorapi Defines the `CachedValidatorsProvider` async trait in pluto-core's validatorapi module. Mirrors Charon's `app/eth2wrap.CachedValidatorsProvider` so the validator-API component can look up `validator_index -> DV root pubkey` (used by the selections / voluntary-exit / sync-committee submit handlers in follow-up PRs) without `pluto-core` depending on the application crate that owns the concrete per-epoch cache implementation. `Component` gains an `Arc` field threaded through both constructors as a required dependency, plus a private `fetch_active_validators` helper bounded by `UPSTREAM_REQUEST_TIMEOUT` that all submit handlers share. A `TestValidatorCache` test double lets unit tests supply the map directly without spinning up a real beacon node. The existing `crates/app/src/eth2wrap/valcache.rs` is intentionally untouched. It will impl this trait at the wiring layer (`pluto-app` already depends on `pluto-core`, so the dependency runs the right way without a new crate or any cross-crate moves). Test plan: - cargo +nightly fmt --all --check - cargo clippy -p pluto-core --all-targets --all-features -- -D warnings - cargo test -p pluto-core --all-features (430 passing; 2 new tests on `fetch_active_validators` covering the happy path and the provider-error -> 502 mapping) Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- crates/core/src/validatorapi/component.rs | 150 +++++++++++++++++- crates/core/src/validatorapi/mod.rs | 2 + .../core/src/validatorapi/validator_cache.rs | 35 ++++ 3 files changed, 181 insertions(+), 6 deletions(-) create mode 100644 crates/core/src/validatorapi/validator_cache.rs diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 969f92e7..29aa3af7 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -13,7 +13,7 @@ use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, - spec::phase0::{BLSPubKey, Epoch, Root}, + spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; use tokio::time::error::Elapsed; @@ -32,6 +32,7 @@ use super::{ VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, + validator_cache::CachedValidatorsProvider, }; use crate::{ dutydb::{Error as DutyDbError, MemDB}, @@ -175,6 +176,14 @@ pub struct Component { /// Looks up the root pubkey for an `(slot, commIdx, valIdx)` triple. #[allow(dead_code, reason = "consumed by submit_attestations in later PRs")] pub_key_by_att_fn: Option, + /// Cluster's per-epoch active-validators lookup. Consumed by the + /// selections / voluntary-exit / sync-committee submit handlers to + /// translate validator-client-supplied `validator_index` values into + /// DV root public keys. Mirrors Go's `c.eth2Cl.ActiveValidators(ctx)`, + /// which is itself backed by `app/eth2wrap`'s per-epoch validator + /// cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + validator_cache: Arc, } impl Component { @@ -185,6 +194,7 @@ impl Component { share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -200,6 +210,7 @@ impl Component { await_agg_sig_db_fn: None, duty_def_fn: None, pub_key_by_att_fn: None, + validator_cache, } } @@ -212,6 +223,7 @@ impl Component { eth2_cl: Arc, dutydb: Arc, share_idx: u64, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -227,6 +239,7 @@ impl Component { await_agg_sig_db_fn: None, duty_def_fn: None, pub_key_by_att_fn: None, + validator_cache, } } @@ -352,6 +365,28 @@ impl Component { Ok(()) } + + /// Fetches the cluster's active validators through the per-epoch + /// [`CachedValidatorsProvider`], bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. + /// Translates cache failures into `ApiError`s without leaking the + /// underlying error into the client-visible message. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via + /// `app/eth2wrap`'s validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + async fn fetch_active_validators( + &self, + ) -> Result, ApiError> { + tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.validator_cache.active_validators(), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("active validators"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed") + .with_boxed_source(err) + }) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -821,6 +856,36 @@ mod tests { validatorapi::types::AttestationDataOpts, }; + /// In-memory stand-in for the per-epoch validator cache. Tests supply + /// the `validator_index -> root pubkey` map up front instead of + /// running a real beacon-node mock through a `ValidatorCache`. + #[derive(Default)] + pub(super) struct TestValidatorCache(HashMap); + + impl TestValidatorCache { + pub(super) fn arc( + map: HashMap, + ) -> Arc { + Arc::new(Self(map)) + } + + pub(super) fn empty() -> Arc { + Self::arc(HashMap::new()) + } + } + + #[async_trait] + impl CachedValidatorsProvider for TestValidatorCache { + async fn active_validators( + &self, + ) -> Result< + HashMap, + super::super::validator_cache::CachedValidatorsError, + > { + Ok(self.0.clone()) + } + } + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are /// `Scheduled` but never naturally expire. struct FarFutureCalculator; @@ -844,7 +909,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, dutydb) } @@ -1086,7 +1152,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); // Start an await before any data is stored. let waiter = { @@ -1272,7 +1339,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - Component::new(eth2_cl, dutydb, 1, map, false) + Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()) } /// `Subscribe` invokes every registered subscriber, each receiving its @@ -1503,7 +1570,7 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new(eth2_cl, dutydb, 1, map, false); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); (component, mock) } @@ -1586,7 +1653,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, dutydb, 1); + let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty()); component .verify_partial_sig( @@ -1599,4 +1666,75 @@ mod tests { .await .expect("insecure_test mode skips verification"); } + + // ==================================================================== + // CachedValidatorsProvider plumbing + // ==================================================================== + + /// `fetch_active_validators` returns whatever the registered + /// `CachedValidatorsProvider` yields, untouched. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)` return shape. + #[tokio::test] + async fn fetch_active_validators_returns_cache_contents() { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + + let expected = HashMap::from([(1u64, dv_pubkey(0xA1)), (7u64, dv_pubkey(0xA7))]); + let component = Component::new_insecure( + eth2_cl, + dutydb, + 1, + TestValidatorCache::arc(expected.clone()), + ); + + let got = component + .fetch_active_validators() + .await + .expect("test cache always succeeds"); + assert_eq!(got, expected); + } + + /// A provider that surfaces a transport-style error is mapped to a 502 + /// without leaking the underlying error into the client-visible + /// message. + #[tokio::test] + async fn fetch_active_validators_maps_provider_error_to_502() { + struct FailingCache; + + #[async_trait] + impl CachedValidatorsProvider for FailingCache { + async fn active_validators( + &self, + ) -> Result< + HashMap, + super::super::validator_cache::CachedValidatorsError, + > { + Err("upstream unavailable".into()) + } + } + + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-fail-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, dutydb, 1, Arc::new(FailingCache)); + + let err = component.fetch_active_validators().await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert_eq!(err.message, "active validators lookup failed"); + } } diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index 8442859c..28a1d680 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -9,6 +9,7 @@ pub mod handler; pub mod metrics; pub mod router; pub mod types; +pub mod validator_cache; #[cfg(test)] pub mod testutils; @@ -17,3 +18,4 @@ pub use component::Component; pub use error::ApiError; pub use handler::Handler; pub use router::new_router; +pub use validator_cache::{CachedValidatorsError, CachedValidatorsProvider}; diff --git a/crates/core/src/validatorapi/validator_cache.rs b/crates/core/src/validatorapi/validator_cache.rs new file mode 100644 index 00000000..34f22b5f --- /dev/null +++ b/crates/core/src/validatorapi/validator_cache.rs @@ -0,0 +1,35 @@ +//! Cluster-wide active-validators lookup consumed by submit handlers. +//! +//! Mirrors Charon's `app/eth2wrap.CachedValidatorsProvider` interface: +//! submit handlers that have to translate a validator-client-supplied +//! `validator_index` into the cluster's DV root public key consult this +//! trait. Defined here in `pluto-core` so the validator API does not need +//! to depend on the application crate that owns the concrete per-epoch +//! cache implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use pluto_eth2api::spec::phase0::{BLSPubKey, ValidatorIndex}; + +/// Boxed error returned by [`CachedValidatorsProvider`] methods. Kept +/// opaque so the trait does not bind callers to any single backing +/// implementation's error type. +pub type CachedValidatorsError = Box; + +/// Provides the cluster's currently active validators, indexed by +/// validator index. Mirrors Go's `eth2Cl.ActiveValidators(ctx)`, which is +/// itself backed by `app/eth2wrap`'s per-epoch validator cache; the +/// validator-API [`Component`](super::Component) calls through this trait +/// so the cache is the single source of truth across duty handlers +/// without `pluto-core` depending on the cache crate. +/// +/// Implementations may populate the underlying cache on demand — callers +/// must not assume the call is non-blocking. +#[async_trait] +pub trait CachedValidatorsProvider: Send + Sync { + /// Returns the `validator_index -> DV root BLS public key` map. + async fn active_validators( + &self, + ) -> Result, CachedValidatorsError>; +} From 6556a0953efbdb7de59750023d4b9593368ebfb5 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Fri, 29 May 2026 15:53:38 +0200 Subject: [PATCH 02/11] feat(core): implement validatorapi beacon/sync committee selections handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports `BeaconCommitteeSelections` and `SyncCommitteeSelections` from `core/validatorapi/validatorapi.go` (lines 798-864 and 1072-1138). Each handler resolves every selection's validator-index to a DV root pubkey via a new `register_active_validators` hook, builds the matching `signeddata::*::new_partial` wrapper, verifies the per-share selection proof, groups the partial-signed data by slot, fans the per-slot `ParSignedDataSet` out to every subscriber under the corresponding `PrepareAggregator` / `PrepareSyncContribution` duty, then pulls each aggregated reply back out of the AggSigDB via `await_agg_sig_db` and stitches it into the response. The `BeaconCommitteeSelection` / `SyncCommitteeSelection` placeholder structs in `validatorapi/types.rs` are now aliases for the consensus `v1::BeaconCommitteeSelection` / `v1::SyncCommitteeSelection`, matching the established pattern (`AttestationData = phase0::AttestationData`) so the handler signatures carry the real validator-index / slot / selection-proof tuple Go uses. `fetch_active_validators` is bounded by `UPSTREAM_REQUEST_TIMEOUT`; each `await_agg_sig_db` lookup is bounded by `SELECTIONS_AGG_SIG_DB_TIMEOUT` (~two slots). Upstream stalls surface as 504/408; unknown validator indices surface as 400 with a generic "validator not found" message; verification failures surface as 400; subscriber / lookup errors surface as 500 with the original error attached as `source` for debug logging only. Tests (10 new): - happy path for each handler (one selection in, one aggregated out) - multi-selection input — per-slot subscriber fanout + response stitching - unknown validator-index short-circuits with 400 before the AggSigDB is touched - tampered (zero) selection proof short-circuits with 400 before the AggSigDB is touched - stalled `await_agg_sig_db` trips `SELECTIONS_AGG_SIG_DB_TIMEOUT` and returns 408 Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- crates/core/src/validatorapi/component.rs | 796 +++++++++++++++++++++- crates/core/src/validatorapi/types.rs | 23 +- 2 files changed, 801 insertions(+), 18 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 29aa3af7..12239bbe 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -13,10 +13,14 @@ use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, - spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, + spec::phase0::{BLSPubKey, Epoch, Root, Slot, ValidatorIndex}, +}; +use pluto_eth2util::{ + helpers::epoch_from_slot, + signing::{self, DomainName, SigningError}, }; -use pluto_eth2util::signing::{self, DomainName, SigningError}; use tokio::time::error::Elapsed; +use tree_hash::TreeHash; use super::{ error::ApiError, @@ -36,11 +40,12 @@ use super::{ }; use crate::{ dutydb::{Error as DutyDbError, MemDB}, + signeddata, signeddata::{ SyncContribution, VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, }, - types::{Duty, ParSignedDataSet, PubKey, Signature, SignedData}, + types::{Duty, ParSignedDataSet, PubKey, Signature, SignedData, SlotNumber}, version, }; @@ -121,6 +126,14 @@ const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// attestation duty has time to flow through the pipeline. const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); +/// Hard deadline for each blocking `await_agg_sig_db_fn` lookup performed by +/// the selection handlers. The Go reference relies on the request context +/// timeout; here we set an explicit bound so a stalled AggSigDB cannot pin a +/// selections request forever. Sized to roughly two slots so a real +/// `PrepareAggregator` / `PrepareSyncContribution` duty has time to reach +/// the AggSigDB. +const SELECTIONS_AGG_SIG_DB_TIMEOUT: Duration = Duration::from_secs(24); + /// Validator API [`Handler`] implementation. /// /// Holds the upstream beacon-node client and the cluster's public-key / @@ -182,7 +195,6 @@ pub struct Component { /// DV root public keys. Mirrors Go's `c.eth2Cl.ActiveValidators(ctx)`, /// which is itself backed by `app/eth2wrap`'s per-epoch validator /// cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, } @@ -372,7 +384,6 @@ impl Component { /// underlying error into the client-visible message. Mirrors Go's /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via /// `app/eth2wrap`'s validator cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators( &self, ) -> Result, ApiError> { @@ -387,6 +398,69 @@ impl Component { .with_boxed_source(err) }) } + + /// Looks up the DV root pubkey for a selection's `validator_index`. + /// Mirrors Go's `vals[selection.ValidatorIndex]` / `core.PubKeyFromBytes` + /// pair at the top of each selection loop. Returns both representations + /// the handler needs: the `BLSPubKey` for signature verification and the + /// `core::PubKey` for use as a `ParSignedDataSet` key. + fn resolve_validator( + &self, + validator_index: ValidatorIndex, + active_validators: &HashMap, + endpoint: &'static str, + ) -> Result<(BLSPubKey, PubKey), ApiError> { + let root = active_validators.get(&validator_index).ok_or_else(|| { + // Mirrors Go's `errors.New("validator not found")` branch — the + // caller asked us to sign for a validator that is not part of + // the cluster. 400 (not 502): the failure is request-level, not + // gateway-level. + ApiError::new( + StatusCode::BAD_REQUEST, + format!("{endpoint}: validator not found"), + ) + })?; + Ok((*root, PubKey::new(*root))) + } + + /// Verifies a selection's partial signature. Bundles slot → epoch + /// resolution alongside the underlying `verify_partial_sig` call and + /// surfaces the failure as a 400 with a generic message (matching the + /// Go path where `verifyPartialSig` returns an error that propagates + /// out of the handler). + async fn verify_selection_partial_sig( + &self, + root_pubkey: &BLSPubKey, + domain: DomainName, + slot: Slot, + message_root: Root, + signature: &Signature, + endpoint: &'static str, + ) -> Result<(), ApiError> { + // Resolve the epoch first so a misconfigured upstream surfaces as + // 502 rather than as a verification failure. + let epoch = epoch_from_slot(&self.eth2_cl, slot).await.map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("{endpoint}: epoch lookup failed"), + ) + .with_source(err) + })?; + + self.verify_partial_sig(root_pubkey, domain, epoch, message_root, signature) + .await + .map_err(|err| match err { + VerifyPartialSigError::UnknownPubKey => ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{endpoint}: unknown validator public key"), + ), + VerifyPartialSigError::Signing(inner) => ApiError::new( + StatusCode::BAD_REQUEST, + format!("{endpoint}: invalid partial signature"), + ) + .with_source(inner), + }) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -627,16 +701,202 @@ impl Handler for Component { async fn beacon_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("beacon_committee_selections not yet ported") + // Port of `BeaconCommitteeSelections` in + // `core/validatorapi/validatorapi.go` (lines 798–864). + let active_validators = self.fetch_active_validators().await?; + + // psigs_by_slot mirrors Go's `psigsBySlot[slot]ParSignedDataSet` — + // keyed by `(slot, pubkey)` so the per-slot fanout below produces one + // `PrepareAggregator` duty per slot covering every selection from + // that slot. + let mut psigs_by_slot: HashMap = HashMap::new(); + for selection in &selections { + let (root_pubkey, core_pubkey) = self.resolve_validator( + selection.validator_index, + &active_validators, + "beacon committee selection", + )?; + + let par_sig = signeddata::BeaconCommitteeSelection::new_partial( + selection.clone(), + self.share_idx, + ); + + // The selection-proof signs the slot under + // `DOMAIN_SELECTION_PROOF`. + self.verify_selection_partial_sig( + &root_pubkey, + DomainName::SelectionProof, + selection.slot, + selection.slot.tree_hash_root().0, + &selection.selection_proof, + "beacon committee selection", + ) + .await?; + + psigs_by_slot + .entry(selection.slot) + .or_default() + .insert(core_pubkey, par_sig); + } + + // Fanout every per-slot set to every subscriber. Subscribers receive + // their own clone (the wrapper installed by `subscribe` clones the + // set before each invocation). + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); + for sub in &self.subs { + sub(&duty, set.clone()).await.map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "beacon committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; + } + } + + // Pull every aggregated selection back out of the AggSigDB. + let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "await_agg_sig_db hook not registered", + ) + })?; + + let mut resp: Vec = Vec::with_capacity(selections.len()); + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); + for pk in set.inner().keys() { + let signed = tokio::time::timeout( + SELECTIONS_AGG_SIG_DB_TIMEOUT, + await_fn(duty.clone(), *pk), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated beacon committee selection not available before deadline", + ) + })? + .map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated beacon committee selection lookup failed", + ) + .with_boxed_source(err) + })?; + + let selection = downcast_beacon_committee_selection(signed.as_ref())?; + resp.push(selection.0.clone()); + } + } + + Ok(EthResponse { + data: resp, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } async fn sync_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not yet ported") + // Port of `SyncCommitteeSelections` in + // `core/validatorapi/validatorapi.go` (lines 1072–1138). + let active_validators = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for selection in &selections { + let (root_pubkey, core_pubkey) = self.resolve_validator( + selection.validator_index, + &active_validators, + "sync committee selection", + )?; + + let par_sig = + signeddata::SyncCommitteeSelection::new_partial(selection.clone(), self.share_idx); + + // Sync committee selection proofs sign over a + // `SyncAggregatorSelectionData` root under + // `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`. The Rust selection + // wrapper's `message_root()` already computes this exactly the + // way Go's `SyncCommitteeSelection.MessageRoot()` does — see + // `crates/eth2api/src/v1.rs`. + self.verify_selection_partial_sig( + &root_pubkey, + DomainName::SyncCommitteeSelectionProof, + selection.slot, + selection.message_root(), + &selection.selection_proof, + "sync committee selection", + ) + .await?; + + psigs_by_slot + .entry(selection.slot) + .or_default() + .insert(core_pubkey, par_sig); + } + + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + for sub in &self.subs { + sub(&duty, set.clone()).await.map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "sync committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; + } + } + + let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "await_agg_sig_db hook not registered", + ) + })?; + + let mut resp: Vec = Vec::with_capacity(selections.len()); + for (slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + for pk in set.inner().keys() { + let signed = tokio::time::timeout( + SELECTIONS_AGG_SIG_DB_TIMEOUT, + await_fn(duty.clone(), *pk), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated sync committee selection not available before deadline", + ) + })? + .map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated sync committee selection lookup failed", + ) + .with_boxed_source(err) + })?; + + let selection = downcast_sync_committee_selection(signed.as_ref())?; + resp.push(selection.0.clone()); + } + } + + Ok(EthResponse { + data: resp, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } async fn validators( @@ -812,6 +1072,41 @@ fn swap_sync_committee_pubshares( Ok(()) } +/// Downcasts the aggregated signed data from the AggSigDB to a +/// `BeaconCommitteeSelection`. Mirrors Go's +/// `s.(core.BeaconCommitteeSelection)` type assertion. A mismatch indicates +/// a wiring bug — the cluster stored the wrong duty type under the +/// `PrepareAggregator` duty — so it surfaces as 500 rather than 4xx. +fn downcast_beacon_committee_selection( + signed: &dyn SignedData, +) -> Result<&signeddata::BeaconCommitteeSelection, ApiError> { + signed + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid beacon committee selection", + ) + }) +} + +/// Sync committee selections counterpart of +/// [`downcast_beacon_committee_selection`]. +fn downcast_sync_committee_selection( + signed: &dyn SignedData, +) -> Result<&signeddata::SyncCommitteeSelection, ApiError> { + signed + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid sync committee selection", + ) + }) +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -1667,7 +1962,6 @@ mod tests { .expect("insecure_test mode skips verification"); } - // ==================================================================== // CachedValidatorsProvider plumbing // ==================================================================== @@ -1737,4 +2031,486 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); assert_eq!(err.message, "active validators lookup failed"); } + + // ==================================================================== + // beacon_committee_selections / sync_committee_selections handlers + // ==================================================================== + + use pluto_eth2api::v1::{ + BeaconCommitteeSelection as V1BeaconCommitteeSelection, + SyncCommitteeSelection as V1SyncCommitteeSelection, + }; + + use crate::signeddata::{ + BeaconCommitteeSelection as SignedBeaconCommitteeSelection, + SyncCommitteeSelection as SignedSyncCommitteeSelection, + }; + + /// Builds a `(Component, BeaconMock)` pair backed by `BeaconMock`'s + /// default spec — which already contains `DOMAIN_SELECTION_PROOF`, + /// `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`, and `SLOTS_PER_EPOCH`. The + /// component is *insecure* so the selections handlers can run without + /// real BLS signatures; specific tests that exercise verification opt + /// into a secure component via [`make_selections_component_secure`]. The + /// caller-supplied `active_validators` map populates the per-epoch + /// validator cache the handlers consult to translate + /// `validator_index → DV root pubkey`. + async fn make_selections_component_insecure( + active_validators: HashMap, + ) -> (Component, BeaconMock) { + let mock = BeaconMock::builder() + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap(); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "selections-tests", FarFutureCalculator); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new_insecure( + eth2_cl, + dutydb, + 1, + TestValidatorCache::arc(active_validators), + ); + (component, mock) + } + + /// Like [`make_selections_component_insecure`] but with `insecure_test` + /// disabled. The caller supplies the `pub_share_by_pubkey` map so + /// `verify_partial_sig` can resolve a verify-share for each DV root, and + /// the `active_validators` map populates the validator-cache the + /// selections handlers consult. + async fn make_selections_component_secure( + map: HashMap, + active_validators: HashMap, + ) -> (Component, BeaconMock) { + let mock = BeaconMock::builder() + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap(); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "selections-secure-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new( + eth2_cl, + dutydb, + 1, + map, + false, + TestValidatorCache::arc(active_validators), + ); + (component, mock) + } + + /// Happy-path beacon committee selections: one selection in, one + /// aggregated selection out. Mirrors the per-validator-index flow in + /// Go's `BeaconCommitteeSelections` at `validatorapi.go:798-864`. + #[tokio::test] + async fn beacon_committee_selections_happy_path() { + const SLOT: Slot = 12; + const VAL_IDX: ValidatorIndex = 5; + let dv_root = dv_pubkey(0xA1); + + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(VAL_IDX, dv_root)])).await; + + // Returned aggregated selection — the byte pattern shows the + // response actually flowed through `await_agg_sig_db`. + let agg_selection = V1BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0xAB; 96], + }; + let agg_clone = agg_selection.clone(); + component.register_await_agg_sig_db(move |_duty, _pk| { + let agg = agg_clone.clone(); + async move { + Ok::, CallbackError>(Box::new( + SignedBeaconCommitteeSelection::new(agg), + )) + } + }); + + let input = V1BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0x77; 96], + }; + let resp = component + .beacon_committee_selections(vec![input]) + .await + .expect("happy path"); + + assert_eq!(resp.data.len(), 1); + assert_eq!(resp.data[0], agg_selection); + } + + /// Multi-selection input is fanned out via the subscriber once per slot + /// covered, and every aggregated reply is stitched into the response. + #[tokio::test] + async fn beacon_committee_selections_multi_selection_fanout_and_stitching() { + const SLOT_A: Slot = 10; + const SLOT_B: Slot = 11; + const VAL_IDX_A: ValidatorIndex = 1; + const VAL_IDX_B: ValidatorIndex = 2; + let dv_root_a = dv_pubkey(0xB1); + let dv_root_b = dv_pubkey(0xB2); + + let (mut component, _mock) = make_selections_component_insecure(HashMap::from([ + (VAL_IDX_A, dv_root_a), + (VAL_IDX_B, dv_root_b), + ])) + .await; + + // Track subscriber invocations: one per distinct slot in the input. + let observed_slots: Arc>> = Arc::new(Mutex::new(Vec::new())); + { + let observed_slots = Arc::clone(&observed_slots); + component.subscribe(move |duty, _set| { + let observed_slots = Arc::clone(&observed_slots); + async move { + observed_slots.lock().unwrap().push(duty.slot.inner()); + Ok(()) + } + }); + } + + // AggSigDB returns the slot+validator-index in the response so we + // can verify each `(slot, pk)` pair was awaited exactly once. + component.register_await_agg_sig_db(move |duty, pk| { + let slot = duty.slot.inner(); + let pk_bytes = pk.as_ref(); + // Recover the validator index from the pubkey: byte 0 is 0xB1 + // for VAL_IDX_A, 0xB2 for VAL_IDX_B. + let val_idx = match pk_bytes[0] { + 0xB1 => VAL_IDX_A, + 0xB2 => VAL_IDX_B, + _ => 999, + }; + async move { + Ok::, CallbackError>(Box::new( + SignedBeaconCommitteeSelection::new(V1BeaconCommitteeSelection { + slot, + validator_index: val_idx, + selection_proof: [0xCD; 96], + }), + )) + } + }); + + let input = vec![ + V1BeaconCommitteeSelection { + slot: SLOT_A, + validator_index: VAL_IDX_A, + selection_proof: [0x11; 96], + }, + V1BeaconCommitteeSelection { + slot: SLOT_B, + validator_index: VAL_IDX_B, + selection_proof: [0x22; 96], + }, + ]; + let resp = component + .beacon_committee_selections(input) + .await + .expect("multi-selection"); + + // Both slots were fanned out to the subscriber once each. + let mut slots = observed_slots.lock().unwrap().clone(); + slots.sort(); + assert_eq!(slots, vec![SLOT_A, SLOT_B]); + + // Both aggregated selections present in the response — iteration + // order over the HashMap is non-deterministic so we sort. + assert_eq!(resp.data.len(), 2); + let mut returned_slots: Vec = resp.data.iter().map(|s| s.slot).collect(); + returned_slots.sort(); + assert_eq!(returned_slots, vec![SLOT_A, SLOT_B]); + let mut returned_indices: Vec = resp.data.iter().map(|s| s.validator_index).collect(); + returned_indices.sort(); + assert_eq!(returned_indices, vec![VAL_IDX_A, VAL_IDX_B]); + } + + /// A selection whose validator index is not part of the cluster's + /// active set fails the lookup short-circuit with `400 Bad Request`, + /// mirroring Go's `errors.New("validator not found")` branch. + #[tokio::test] + async fn beacon_committee_selections_rejects_unknown_validator_index() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + // `await_agg_sig_db` must NOT be reached for an unknown validator. + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called when validator index is unknown"); + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 999, + selection_proof: [0xEE; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// A tampered selection proof fails `verify_partial_sig` and the + /// handler short-circuits with `400 Bad Request` — the remaining + /// selections in the batch are not fanned out and the AggSigDB await is + /// never reached. + #[tokio::test] + async fn beacon_committee_selections_verification_failure_short_circuits() { + // Wire a secure component (insecure_test = false) and register a + // public-share map so `verify_partial_sig` runs the real BLS check + // against the zero signature. + let dv_root = dv_pubkey(0xC1); + let pub_share = [0x55_u8; 48]; + let map = HashMap::from([(dv_root, pub_share)]); + + let (mut component, _mock) = + make_selections_component_secure(map, HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called after verification failure"); + }); + + // Zero signature is rejected by `signing::verify` before BLS runs + // (returns SigningError::ZeroSignature). + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// A stalled `await_agg_sig_db` hook trips + /// `SELECTIONS_AGG_SIG_DB_TIMEOUT` and the handler returns 408. + #[tokio::test(start_paused = true)] + async fn beacon_committee_selections_await_agg_sig_db_times_out() { + let dv_root = dv_pubkey(0xD1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Closure that never completes. + component.register_await_agg_sig_db(|_duty, _pk| async { + std::future::pending::, CallbackError>>().await + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xDD; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Happy-path sync committee selections. + #[tokio::test] + async fn sync_committee_selections_happy_path() { + const SLOT: Slot = 13; + const VAL_IDX: ValidatorIndex = 7; + const SUBCOMM: u64 = 3; + let dv_root = dv_pubkey(0xE1); + + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(VAL_IDX, dv_root)])).await; + + let agg_selection = V1SyncCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + subcommittee_index: SUBCOMM, + selection_proof: [0xCC; 96], + }; + let agg_clone = agg_selection.clone(); + component.register_await_agg_sig_db(move |_duty, _pk| { + let agg = agg_clone.clone(); + async move { + Ok::, CallbackError>(Box::new( + SignedSyncCommitteeSelection::new(agg), + )) + } + }); + + let resp = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + subcommittee_index: SUBCOMM, + selection_proof: [0x99; 96], + }]) + .await + .expect("happy path"); + assert_eq!(resp.data.len(), 1); + assert_eq!(resp.data[0], agg_selection); + } + + /// Multi-selection sync committee selections produce one subscriber + /// invocation per distinct slot and stitch every aggregated reply into + /// the response. + #[tokio::test] + async fn sync_committee_selections_multi_selection_fanout_and_stitching() { + const SLOT_A: Slot = 20; + const SLOT_B: Slot = 21; + const VAL_IDX_A: ValidatorIndex = 1; + const VAL_IDX_B: ValidatorIndex = 2; + let dv_root_a = dv_pubkey(0xF1); + let dv_root_b = dv_pubkey(0xF2); + + let (mut component, _mock) = make_selections_component_insecure(HashMap::from([ + (VAL_IDX_A, dv_root_a), + (VAL_IDX_B, dv_root_b), + ])) + .await; + + let observed_slots: Arc>> = Arc::new(Mutex::new(Vec::new())); + { + let observed_slots = Arc::clone(&observed_slots); + component.subscribe(move |duty, _set| { + let observed_slots = Arc::clone(&observed_slots); + async move { + observed_slots.lock().unwrap().push(duty.slot.inner()); + Ok(()) + } + }); + } + + component.register_await_agg_sig_db(move |duty, pk| { + let slot = duty.slot.inner(); + let pk_bytes = pk.as_ref(); + let val_idx = match pk_bytes[0] { + 0xF1 => VAL_IDX_A, + 0xF2 => VAL_IDX_B, + _ => 999, + }; + async move { + Ok::, CallbackError>(Box::new( + SignedSyncCommitteeSelection::new(V1SyncCommitteeSelection { + slot, + validator_index: val_idx, + subcommittee_index: 0, + selection_proof: [0xDE; 96], + }), + )) + } + }); + + let resp = component + .sync_committee_selections(vec![ + V1SyncCommitteeSelection { + slot: SLOT_A, + validator_index: VAL_IDX_A, + subcommittee_index: 0, + selection_proof: [0x11; 96], + }, + V1SyncCommitteeSelection { + slot: SLOT_B, + validator_index: VAL_IDX_B, + subcommittee_index: 1, + selection_proof: [0x22; 96], + }, + ]) + .await + .expect("multi-selection"); + + let mut slots = observed_slots.lock().unwrap().clone(); + slots.sort(); + assert_eq!(slots, vec![SLOT_A, SLOT_B]); + + assert_eq!(resp.data.len(), 2); + let mut returned_slots: Vec = resp.data.iter().map(|s| s.slot).collect(); + returned_slots.sort(); + assert_eq!(returned_slots, vec![SLOT_A, SLOT_B]); + } + + /// Sync committee selection with an unknown validator index returns + /// 400 without touching the AggSigDB. + #[tokio::test] + async fn sync_committee_selections_rejects_unknown_validator_index() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called when validator index is unknown"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 999, + subcommittee_index: 0, + selection_proof: [0xEE; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// Sync committee selection verification failure short-circuits. + #[tokio::test] + async fn sync_committee_selections_verification_failure_short_circuits() { + let dv_root = dv_pubkey(0xC2); + let pub_share = [0x66_u8; 48]; + let map = HashMap::from([(dv_root, pub_share)]); + + let (mut component, _mock) = + make_selections_component_secure(map, HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called after verification failure"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Sync committee selection times out on a stalled AggSigDB. + #[tokio::test(start_paused = true)] + async fn sync_committee_selections_await_agg_sig_db_times_out() { + let dv_root = dv_pubkey(0xD2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + std::future::pending::, CallbackError>>().await + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xDD; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } } diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 8e18456a..7e203667 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -22,8 +22,23 @@ pub use pluto_eth2api::{ GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, + v1::{ + BeaconCommitteeSelection as V1BeaconCommitteeSelection, + SyncCommitteeSelection as V1SyncCommitteeSelection, + }, }; +/// Beacon-committee selection payload. Aliases the consensus-spec +/// `v1::BeaconCommitteeSelection` so the handler can operate on the same +/// validator-index / slot / selection-proof tuple Go uses at +/// `core/validatorapi/validatorapi.go:798`. +pub type BeaconCommitteeSelection = V1BeaconCommitteeSelection; + +/// Sync-committee selection payload. Aliases the consensus-spec +/// `v1::SyncCommitteeSelection`, matching Go's +/// `core/validatorapi/validatorapi.go:1072` input shape. +pub type SyncCommitteeSelection = V1SyncCommitteeSelection; + /// Attestation data alias for the consensus-spec phase0 type. pub type AttestationData = phase0::AttestationData; @@ -183,14 +198,6 @@ pub struct SyncCommitteeContribution {} #[derive(Debug, Clone)] pub struct SignedContributionAndProof {} -/// Beacon-committee selection payload. Placeholder. -#[derive(Debug, Clone)] -pub struct BeaconCommitteeSelection {} - -/// Sync-committee selection payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeSelection {} - /// Validator-index request body for the `attester_duties` and /// `sync_committee_duties` endpoints. /// From 40918720e4ad46319b63aa6f1ca344f6e3dc753c Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Wed, 3 Jun 2026 22:23:50 +0200 Subject: [PATCH 03/11] feat(core): wire validatorapi selection endpoints to Component MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Handler trait methods Component::{beacon,sync}_committee_selections were reachable only from the Handler trait — the router still had todo!() stubs at the corresponding axum entry points, so any real request to POST /eth/v1/validator/{beacon,sync}_committee_selections would panic the request task. Wires both endpoints into AppState::handler, mirroring Charon's beaconCommitteeSelectionsJSON / syncCommitteeSelectionsJSON wire shape (${data: [...]}) via two small response wrappers in types.rs. Adds SELECTIONS_BODY_LIMIT (64 KiB) and a selections_post helper so the new POST routes are bounded before deserialisation, defending against the request-amplification surface (one BLS verification per selection) once the routes are reachable. Extends TestHandler with selection response stubs and adds router tests for: response shape, sync-variant wrapping, and the body-limit layer rejecting oversized POSTs. --- crates/core/src/validatorapi/router.rs | 177 ++++++++++++++++++++-- crates/core/src/validatorapi/testutils.rs | 32 +++- crates/core/src/validatorapi/types.rs | 18 +++ 3 files changed, 215 insertions(+), 12 deletions(-) diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index a32ab9b9..89032607 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -24,13 +24,22 @@ use serde::Deserialize; /// well above any plausible workload. const DUTIES_BODY_LIMIT: usize = 64 * 1024; +/// Cap on the `POST /eth/v1/validator/{beacon,sync}_committee_selections` +/// request bodies. Each selection is ~300 bytes of JSON (slot, validator index, +/// 96-byte BLS proof in hex); 64 KiB allows >200 entries per request, far more +/// than a realistic cluster size while bounding the per-request CPU cost of +/// the BLS verifications and AggSigDB awaits the handler performs. +const SELECTIONS_BODY_LIMIT: usize = 64 * 1024; + use super::{ error::ApiError, handler::Handler, types::{ AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, - CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + BeaconCommitteeSelection, BeaconCommitteeSelectionsResponse, CommitteeIndex, + NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, + SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeSelection, + SyncCommitteeSelectionsResponse, ValIndexes, }, }; @@ -106,7 +115,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .route("/proposer_config", get(respond_404)) .route( "/eth/v1/validator/beacon_committee_selections", - post(beacon_committee_selections), + selections_post(beacon_committee_selections), ) .route("/eth/v1/validator/aggregate_attestation", get(respond_404)) .route( @@ -136,7 +145,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/sync_committee_selections", - post(sync_committee_selections), + selections_post(sync_committee_selections), ) .route("/eth/v1/node/version", get(node_version)) .fallback(proxy_handler) @@ -249,6 +258,18 @@ async fn enforce_json_content_type(mut req: Request, next: Next) -> Result(handler: H) -> MethodRouter +where + H: axum::handler::Handler, + T: 'static, + S: Clone + Send + Sync + 'static, +{ + post(handler).route_layer(DefaultBodyLimit::max(SELECTIONS_BODY_LIMIT)) +} + /// Renders an axum query-extractor rejection as Pluto's standard /// [`ApiError`] body shape, so all 4xx responses from this router share the /// same `{ "code", "message" }` schema. @@ -310,8 +331,18 @@ async fn submit_exit() { todo!("vapi: submit_exit"); } -async fn beacon_committee_selections() { - todo!("vapi: beacon_committee_selections"); +async fn beacon_committee_selections( + State(state): State>, + Json(selections): Json>, +) -> Result, ApiError> { + let response = state + .handler + .beacon_committee_selections(selections) + .await?; + + Ok(Json(BeaconCommitteeSelectionsResponse { + data: response.data, + })) } async fn aggregate_attestation() { @@ -338,8 +369,15 @@ async fn submit_proposal_preparations() { todo!("vapi: submit_proposal_preparations"); } -async fn sync_committee_selections() { - todo!("vapi: sync_committee_selections"); +async fn sync_committee_selections( + State(state): State>, + Json(selections): Json>, +) -> Result, ApiError> { + let response = state.handler.sync_committee_selections(selections).await?; + + Ok(Json(SyncCommitteeSelectionsResponse { + data: response.data, + })) } async fn node_version( @@ -366,8 +404,9 @@ mod tests { use crate::validatorapi::{ testutils::TestHandler, types::{ - AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, - ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, + BeaconCommitteeSelection, EthResponse, ProposerDutiesResponse, ProposerDuty, + SyncCommitteeDutiesResponse, SyncCommitteeDuty, SyncCommitteeSelection, ValIndexes, }, }; @@ -730,4 +769,122 @@ mod tests { let bad = serde_json::from_str::("[-1]"); assert!(bad.is_err()); } + + /// Verifies the router wraps the `Handler::beacon_committee_selections` + /// payload into the `{ "data": [...] }` shape Charon's wire format uses + /// (`beaconCommitteeSelectionsJSON` in `core/validatorapi/router.go`), + /// dropping the `execution_optimistic` / `finalized` / `dependent_root` + /// metadata that the trait method carries internally. + #[tokio::test] + async fn beacon_committee_selections_wraps_handler_value() { + let selection = BeaconCommitteeSelection { + slot: 10, + validator_index: 5, + selection_proof: [0xAA; 96], + }; + let handler = + TestHandler::default().with_beacon_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = beacon_committee_selections(State(state), Json(vec![])) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert!(json.get("execution_optimistic").is_none()); + assert!(json.get("finalized").is_none()); + assert!(json.get("dependent_root").is_none()); + assert_eq!(json["data"][0]["slot"], "10"); + assert_eq!(json["data"][0]["validator_index"], "5"); + } + + /// Counterpart of [`beacon_committee_selections_wraps_handler_value`] for + /// the sync-committee variant. + #[tokio::test] + async fn sync_committee_selections_wraps_handler_value() { + let selection = SyncCommitteeSelection { + slot: 20, + validator_index: 7, + subcommittee_index: 2, + selection_proof: [0xBB; 96], + }; + let handler = + TestHandler::default().with_sync_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = sync_committee_selections(State(state), Json(vec![])) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert!(json.get("execution_optimistic").is_none()); + assert_eq!(json["data"][0]["slot"], "20"); + assert_eq!(json["data"][0]["validator_index"], "7"); + assert_eq!(json["data"][0]["subcommittee_index"], "2"); + } + + /// Verifies the body-limit layer on the selection POST routes rejects + /// oversized bodies before any BLS verification work happens. + #[tokio::test] + async fn beacon_committee_selections_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let big = vec![b'0'; SELECTIONS_BODY_LIMIT * 2]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/beacon_committee_selections") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// Counterpart of [`beacon_committee_selections_rejects_oversized_body`]. + #[tokio::test] + async fn sync_committee_selections_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let big = vec![b'0'; SELECTIONS_BODY_LIMIT * 2]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/sync_committee_selections") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 45980fe7..cae049d7 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -34,6 +34,10 @@ pub struct TestHandler { pub sync_committee_duties_response: Option, /// Value returned by [`Handler::attestation_data`]. pub attestation_data_response: Option, + /// Value returned by [`Handler::beacon_committee_selections`]. + pub beacon_committee_selections_response: Option>>, + /// Value returned by [`Handler::sync_committee_selections`]. + pub sync_committee_selections_response: Option>>, } impl TestHandler { @@ -68,6 +72,24 @@ impl TestHandler { self.attestation_data_response = Some(response); self } + + /// Sets the response returned by [`Handler::beacon_committee_selections`]. + pub fn with_beacon_committee_selections( + mut self, + response: EthResponse>, + ) -> Self { + self.beacon_committee_selections_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::sync_committee_selections`]. + pub fn with_sync_committee_selections( + mut self, + response: EthResponse>, + ) -> Self { + self.sync_committee_selections_response = Some(response); + self + } } #[async_trait] @@ -163,14 +185,20 @@ impl Handler for TestHandler { &self, _selections: Vec, ) -> Result>, ApiError> { - unimplemented!("beacon_committee_selections not stubbed in TestHandler") + Ok(self + .beacon_committee_selections_response + .clone() + .expect("beacon_committee_selections not stubbed in TestHandler")) } async fn sync_committee_selections( &self, _selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not stubbed in TestHandler") + Ok(self + .sync_committee_selections_response + .clone() + .expect("sync_committee_selections not stubbed in TestHandler")) } async fn validators( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 7e203667..8113a210 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -154,6 +154,24 @@ pub struct AttestationDataResponse { pub data: AttestationData, } +/// Response envelope for the `beacon_committee_selections` endpoint. +/// Matches Charon's `beaconCommitteeSelectionsJSON` wire shape — a `data` +/// array of aggregated selection proofs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BeaconCommitteeSelectionsResponse { + /// Aggregated beacon-committee selection proofs. + pub data: Vec, +} + +/// Response envelope for the `sync_committee_selections` endpoint. +/// Matches Charon's `syncCommitteeSelectionsJSON` wire shape — a `data` +/// array of aggregated selection proofs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncCommitteeSelectionsResponse { + /// Aggregated sync-committee selection proofs. + pub data: Vec, +} + /// Validator payload. Placeholder. #[derive(Debug, Clone)] pub struct Validator {} From ec7b065dd0b1ae4e0c261661b60a36a05ae0b677 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Wed, 3 Jun 2026 22:27:31 +0200 Subject: [PATCH 04/11] fix(core): bound the whole selections fan-out + AggSigDB phase The previous SELECTIONS_AGG_SIG_DB_TIMEOUT was applied per AggSigDB await, so a batch of N selections could legitimately spend N*24s in the lookup phase before failing; the subscriber fan-out had no deadline at all, letting a stalled subscriber pin the handler. Both selection handlers now anchor a single Instant deadline at the start of the fan-out phase and use tokio::time::timeout_at for every subscriber call and every await_agg_sig_db_fn call. The renamed SELECTIONS_PHASE_TIMEOUT is the total budget for that phase rather than a per-await bound. Each error path in the fan-out and AggSigDB-await loops now emits a tracing::warn with the slot and the underlying source error so an operator can correlate a 408/500 response with a debug-pipeline log without leaking either into the client-visible message. Adds six tests: * subscriber timeout (beacon, sync) * empty selections input is a 200 no-op (beacon, sync) * AggSigDB returning the wrong concrete SignedData type produces 500 via the downcast helper (beacon, sync) --- crates/core/src/validatorapi/component.rs | 335 ++++++++++++++++++---- 1 file changed, 279 insertions(+), 56 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 12239bbe..87369832 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -126,13 +126,17 @@ const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// attestation duty has time to flow through the pipeline. const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); -/// Hard deadline for each blocking `await_agg_sig_db_fn` lookup performed by -/// the selection handlers. The Go reference relies on the request context -/// timeout; here we set an explicit bound so a stalled AggSigDB cannot pin a -/// selections request forever. Sized to roughly two slots so a real +/// Hard deadline for the *whole* fan-out + AggSigDB phase of the selection +/// handlers — the per-slot subscriber calls plus every blocking +/// `await_agg_sig_db_fn` lookup share a single budget anchored at this +/// duration after the phase begins. The Go reference relies on the request +/// context timeout; here we set an explicit bound so neither a stalled +/// subscriber nor a stalled AggSigDB can pin a selections request forever, +/// and the worst-case wall time scales with the duration rather than with +/// the per-selection count. Sized to roughly two slots so a real /// `PrepareAggregator` / `PrepareSyncContribution` duty has time to reach /// the AggSigDB. -const SELECTIONS_AGG_SIG_DB_TIMEOUT: Duration = Duration::from_secs(24); +const SELECTIONS_PHASE_TIMEOUT: Duration = Duration::from_secs(24); /// Validator API [`Handler`] implementation. /// @@ -742,19 +746,41 @@ impl Handler for Component { .insert(core_pubkey, par_sig); } + // Bound the entire fan-out + AggSigDB phase with a single deadline + // anchored here so total wall time scales with + // `SELECTIONS_PHASE_TIMEOUT`, not with the number of selections. + let deadline = tokio::time::Instant::now() + SELECTIONS_PHASE_TIMEOUT; + // Fanout every per-slot set to every subscriber. Subscribers receive // their own clone (the wrapper installed by `subscribe` clones the // set before each invocation). for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); for sub in &self.subs { - sub(&duty, set.clone()).await.map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "beacon committee selection subscriber failed", - ) - .with_boxed_source(err) - })?; + tokio::time::timeout_at(deadline, sub(&duty, set.clone())) + .await + .map_err(|_: Elapsed| { + tracing::warn!( + slot = *slot, + "beacon_committee_selections: subscriber timed out" + ); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "beacon committee selection subscriber timed out", + ) + })? + .map_err(|err| { + tracing::warn!( + slot = *slot, + error = %err, + "beacon_committee_selections: subscriber failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "beacon committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; } } @@ -770,24 +796,30 @@ impl Handler for Component { for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); for pk in set.inner().keys() { - let signed = tokio::time::timeout( - SELECTIONS_AGG_SIG_DB_TIMEOUT, - await_fn(duty.clone(), *pk), - ) - .await - .map_err(|_: Elapsed| { - ApiError::new( - StatusCode::REQUEST_TIMEOUT, - "aggregated beacon committee selection not available before deadline", - ) - })? - .map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "aggregated beacon committee selection lookup failed", - ) - .with_boxed_source(err) - })?; + let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) + .await + .map_err(|_: Elapsed| { + tracing::warn!( + slot = *slot, + "beacon_committee_selections: aggsigdb await timed out" + ); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated beacon committee selection not available before deadline", + ) + })? + .map_err(|err| { + tracing::warn!( + slot = *slot, + error = %err, + "beacon_committee_selections: aggsigdb lookup failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated beacon committee selection lookup failed", + ) + .with_boxed_source(err) + })?; let selection = downcast_beacon_committee_selection(signed.as_ref())?; resp.push(selection.0.clone()); @@ -843,16 +875,38 @@ impl Handler for Component { .insert(core_pubkey, par_sig); } + // See `beacon_committee_selections` — bound the whole fan-out + + // AggSigDB phase against a single deadline so the per-selection + // count does not multiply the wall-time budget. + let deadline = tokio::time::Instant::now() + SELECTIONS_PHASE_TIMEOUT; + for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); for sub in &self.subs { - sub(&duty, set.clone()).await.map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "sync committee selection subscriber failed", - ) - .with_boxed_source(err) - })?; + tokio::time::timeout_at(deadline, sub(&duty, set.clone())) + .await + .map_err(|_: Elapsed| { + tracing::warn!( + slot = *slot, + "sync_committee_selections: subscriber timed out" + ); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "sync committee selection subscriber timed out", + ) + })? + .map_err(|err| { + tracing::warn!( + slot = *slot, + error = %err, + "sync_committee_selections: subscriber failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "sync committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; } } @@ -867,24 +921,30 @@ impl Handler for Component { for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); for pk in set.inner().keys() { - let signed = tokio::time::timeout( - SELECTIONS_AGG_SIG_DB_TIMEOUT, - await_fn(duty.clone(), *pk), - ) - .await - .map_err(|_: Elapsed| { - ApiError::new( - StatusCode::REQUEST_TIMEOUT, - "aggregated sync committee selection not available before deadline", - ) - })? - .map_err(|err| { - ApiError::new( - StatusCode::INTERNAL_SERVER_ERROR, - "aggregated sync committee selection lookup failed", - ) - .with_boxed_source(err) - })?; + let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) + .await + .map_err(|_: Elapsed| { + tracing::warn!( + slot = *slot, + "sync_committee_selections: aggsigdb await timed out" + ); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated sync committee selection not available before deadline", + ) + })? + .map_err(|err| { + tracing::warn!( + slot = *slot, + error = %err, + "sync_committee_selections: aggsigdb lookup failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated sync committee selection lookup failed", + ) + .with_boxed_source(err) + })?; let selection = downcast_sync_committee_selection(signed.as_ref())?; resp.push(selection.0.clone()); @@ -2301,7 +2361,7 @@ mod tests { } /// A stalled `await_agg_sig_db` hook trips - /// `SELECTIONS_AGG_SIG_DB_TIMEOUT` and the handler returns 408. + /// [`SELECTIONS_PHASE_TIMEOUT`] and the handler returns 408. #[tokio::test(start_paused = true)] async fn beacon_committee_selections_await_agg_sig_db_times_out() { let dv_root = dv_pubkey(0xD1); @@ -2513,4 +2573,167 @@ mod tests { .unwrap_err(); assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); } + + /// A stalled subscriber must not pin the handler indefinitely — the + /// per-request deadline applies to subscriber fan-out as well as the + /// AggSigDB await. The handler returns 408. + #[tokio::test(start_paused = true)] + async fn beacon_committee_selections_subscriber_timeout() { + let dv_root = dv_pubkey(0xF1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Subscriber that never completes. + component.subscribe(|_duty, _set| async { + std::future::pending::>().await + }); + // `await_agg_sig_db` must NOT be reached if the subscriber blocks. + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached when the subscriber stalls"); + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xAA; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Counterpart of [`beacon_committee_selections_subscriber_timeout`]. + #[tokio::test(start_paused = true)] + async fn sync_committee_selections_subscriber_timeout() { + let dv_root = dv_pubkey(0xF2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.subscribe(|_duty, _set| async { + std::future::pending::>().await + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached when the subscriber stalls"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xBB; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// An empty selections array is a well-defined no-op: the handler runs + /// the active-validators lookup, finds nothing to fan out, never queries + /// the AggSigDB, and returns an empty `data` array. Mirrors Go's + /// behaviour where `wrapResponse(nil)` returns `data: []`. + #[tokio::test] + async fn beacon_committee_selections_empty_input_returns_empty_data() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + // Subscriber and AggSigDB must NOT be touched for an empty input. + component.subscribe(|_duty, _set| async { + panic!("subscriber must not run for empty input"); + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached for empty input"); + }); + + let resp = component + .beacon_committee_selections(vec![]) + .await + .expect("empty input is a no-op success"); + assert!(resp.data.is_empty()); + } + + /// Counterpart of + /// [`beacon_committee_selections_empty_input_returns_empty_data`]. + #[tokio::test] + async fn sync_committee_selections_empty_input_returns_empty_data() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + component.subscribe(|_duty, _set| async { + panic!("subscriber must not run for empty input"); + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached for empty input"); + }); + + let resp = component + .sync_committee_selections(vec![]) + .await + .expect("empty input is a no-op success"); + assert!(resp.data.is_empty()); + } + + /// If the AggSigDB ever returns the wrong concrete `SignedData` type + /// under a `PrepareAggregator` duty (a wiring bug), the handler must + /// surface `500 Internal Server Error` rather than panic or return a + /// silently-wrong response. + #[tokio::test] + async fn beacon_committee_selections_rejects_aggsigdb_type_mismatch() { + let dv_root = dv_pubkey(0xA1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Wrong type — returns a `SyncCommitteeSelection` under a + // `PrepareAggregator` duty, which `downcast_beacon_committee_selection` + // cannot satisfy. + component.register_await_agg_sig_db(|_duty, _pk| async { + Ok::, CallbackError>(Box::new(SignedSyncCommitteeSelection::new( + V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xCC; 96], + }, + ))) + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0x00; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// Counterpart of + /// [`beacon_committee_selections_rejects_aggsigdb_type_mismatch`]. + #[tokio::test] + async fn sync_committee_selections_rejects_aggsigdb_type_mismatch() { + let dv_root = dv_pubkey(0xA2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + Ok::, CallbackError>(Box::new(SignedBeaconCommitteeSelection::new( + V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xDD; 96], + }, + ))) + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0x00; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } } From 0cc23bd27792a16fd12022638c8153f5b87cc228 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Wed, 3 Jun 2026 22:27:53 +0200 Subject: [PATCH 05/11] style: apply rustfmt to validatorapi/router.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cleanup after the selection-route wiring change — rustfmt collapses the BeaconCommitteeSelectionsResponse import block onto one line and trims the let bindings in the new tests to match the file's existing formatting. --- crates/core/src/validatorapi/router.rs | 32 ++++++++++++-------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 89032607..fde0f8cd 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -37,9 +37,9 @@ use super::{ types::{ AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, BeaconCommitteeSelection, BeaconCommitteeSelectionsResponse, CommitteeIndex, - NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, SyncCommitteeSelection, - SyncCommitteeSelectionsResponse, ValIndexes, + NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, SyncCommitteeDutiesOpts, + SyncCommitteeDutiesResponse, SyncCommitteeSelection, SyncCommitteeSelectionsResponse, + ValIndexes, }, }; @@ -782,13 +782,12 @@ mod tests { validator_index: 5, selection_proof: [0xAA; 96], }; - let handler = - TestHandler::default().with_beacon_committee_selections(EthResponse { - data: vec![selection], - execution_optimistic: false, - finalized: false, - dependent_root: None, - }); + let handler = TestHandler::default().with_beacon_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); let state = Arc::new(AppState { handler: Arc::new(handler), builder_enabled: false, @@ -816,13 +815,12 @@ mod tests { subcommittee_index: 2, selection_proof: [0xBB; 96], }; - let handler = - TestHandler::default().with_sync_committee_selections(EthResponse { - data: vec![selection], - execution_optimistic: false, - finalized: false, - dependent_root: None, - }); + let handler = TestHandler::default().with_sync_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); let state = Arc::new(AppState { handler: Arc::new(handler), builder_enabled: false, From b8fe7b045e92650a90c6bb4ad530fe6904ccc5bd Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 4 Jun 2026 00:08:21 +0200 Subject: [PATCH 06/11] fix(core): use checked_add for selections phase deadline clippy::arithmetic_side_effects (workspace-wide deny) flags `Instant::now() + Duration` even though overflow is unreachable in practice. Switches to `checked_add` with an .expect() carrying the invariant, matching the pattern already used elsewhere in the crate (qbft/consensus timer code). --- crates/core/src/validatorapi/component.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 87369832..bea387be 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -749,7 +749,9 @@ impl Handler for Component { // Bound the entire fan-out + AggSigDB phase with a single deadline // anchored here so total wall time scales with // `SELECTIONS_PHASE_TIMEOUT`, not with the number of selections. - let deadline = tokio::time::Instant::now() + SELECTIONS_PHASE_TIMEOUT; + let deadline = tokio::time::Instant::now() + .checked_add(SELECTIONS_PHASE_TIMEOUT) + .expect("Instant + 24s does not overflow on a real clock"); // Fanout every per-slot set to every subscriber. Subscribers receive // their own clone (the wrapper installed by `subscribe` clones the @@ -878,7 +880,9 @@ impl Handler for Component { // See `beacon_committee_selections` — bound the whole fan-out + // AggSigDB phase against a single deadline so the per-selection // count does not multiply the wall-time budget. - let deadline = tokio::time::Instant::now() + SELECTIONS_PHASE_TIMEOUT; + let deadline = tokio::time::Instant::now() + .checked_add(SELECTIONS_PHASE_TIMEOUT) + .expect("Instant + 24s does not overflow on a real clock"); for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); From 0f97c79bfb3f648cd145dcb27a5cf9c863a9f1b3 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 4 Jun 2026 00:39:48 +0200 Subject: [PATCH 07/11] fix(core): normalise malformed-body 4xx to {code,message} envelope MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The selection endpoints (and the duties endpoints on this branch) extracted the body as bare `Json<...>`, so malformed JSON, wrong element types, and missing content-type produced axum's default plain-text 400 / 422 / 415 — out of line with the router's other 4xx responses which already use the `{code, message}` envelope via `query_rejection_to_api_error`. Mirrors the fix already on main for the duties endpoints (b5eb0ca) which this branch predates. Adds a `json_rejection_to_api_error` helper that normalises every parse failure to a uniform 400 (matching Charon's `unmarshal`) while preserving the 413 path so the body-size cap still surfaces correctly. Both selection routes and both duties routes route their `Json<...>` rejection through it. Also: * Reorders router.rs so all `use` items come before the `const` declarations (style — the SELECTIONS_BODY_LIMIT addition had split the `use` blocks). * Collapses the duplicate `duties_post` / `selections_post` helpers into a single generic `bounded_post(handler, limit)`. * Tightens the `SELECTIONS_BODY_LIMIT` doc-comment per-entry size estimate (~210-250 B, ~250-300 entries per request). * Adds two router tests asserting the envelope shape for a malformed body — one per selection endpoint. --- crates/core/src/validatorapi/component.rs | 4 +- crates/core/src/validatorapi/router.rs | 125 +++++++++++++++------- 2 files changed, 88 insertions(+), 41 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index bea387be..bead488d 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -759,7 +759,7 @@ impl Handler for Component { for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); for sub in &self.subs { - tokio::time::timeout_at(deadline, sub(&duty, set.clone())) + tokio::time::timeout_at(deadline, sub(&duty, set)) .await .map_err(|_: Elapsed| { tracing::warn!( @@ -887,7 +887,7 @@ impl Handler for Component { for (slot, set) in &psigs_by_slot { let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); for sub in &self.subs { - tokio::time::timeout_at(deadline, sub(&duty, set.clone())) + tokio::time::timeout_at(deadline, sub(&duty, set)) .await .map_err(|_: Elapsed| { tracing::warn!( diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index fde0f8cd..138080af 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -18,19 +18,6 @@ use axum::{ }; use serde::Deserialize; -/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request -/// bodies. A realistic cluster ships at most a few thousand validator indices; -/// 64 KiB still allows ~10k indices in either numeric or string encoding, -/// well above any plausible workload. -const DUTIES_BODY_LIMIT: usize = 64 * 1024; - -/// Cap on the `POST /eth/v1/validator/{beacon,sync}_committee_selections` -/// request bodies. Each selection is ~300 bytes of JSON (slot, validator index, -/// 96-byte BLS proof in hex); 64 KiB allows >200 entries per request, far more -/// than a realistic cluster size while bounding the per-request CPU cost of -/// the BLS verifications and AggSigDB awaits the handler performs. -const SELECTIONS_BODY_LIMIT: usize = 64 * 1024; - use super::{ error::ApiError, handler::Handler, @@ -43,6 +30,20 @@ use super::{ }, }; +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + +/// Cap on the `POST /eth/v1/validator/{beacon,sync}_committee_selections` +/// request bodies. Each selection is ~210-250 bytes of JSON (slot, validator +/// index, optional subcommittee index, 96-byte BLS proof in `0x` hex), so +/// 64 KiB admits ~250-300 entries — far more than a realistic cluster while +/// bounding the per-request CPU cost of the BLS verifications and AggSigDB +/// awaits the handler performs. +const SELECTIONS_BODY_LIMIT: usize = 64 * 1024; + /// Query parameters for `GET /eth/v1/validator/attestation_data`. #[derive(Debug, Clone, Deserialize)] struct AttestationDataQuery { @@ -75,7 +76,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", - duties_post(attester_duties), + bounded_post(attester_duties, DUTIES_BODY_LIMIT), ) .route( "/eth/v1/validator/duties/proposer/{epoch}", @@ -83,7 +84,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/sync/{epoch}", - duties_post(sync_committee_duties), + bounded_post(sync_committee_duties, DUTIES_BODY_LIMIT), ) .route("/eth/v1/validator/attestation_data", get(attestation_data)) .route("/eth/v1/beacon/pool/attestations", post(respond_404)) @@ -115,7 +116,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .route("/proposer_config", get(respond_404)) .route( "/eth/v1/validator/beacon_committee_selections", - selections_post(beacon_committee_selections), + bounded_post(beacon_committee_selections, SELECTIONS_BODY_LIMIT), ) .route("/eth/v1/validator/aggregate_attestation", get(respond_404)) .route( @@ -145,7 +146,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/sync_committee_selections", - selections_post(sync_committee_selections), + bounded_post(sync_committee_selections, SELECTIONS_BODY_LIMIT), ) .route("/eth/v1/node/version", get(node_version)) .fallback(proxy_handler) @@ -214,18 +215,17 @@ async fn attestation_data( Ok(Json(response)) } -/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap -/// and the Charon-parity content-type policy. The cap is local to these -/// two routes so unrelated POST handlers (e.g. `submit_attestations`) keep -/// axum's default 2 MiB. -fn duties_post(handler: H) -> MethodRouter +/// Wraps a `POST` handler with a body-size cap and the Charon-parity +/// content-type policy. The cap is local to the route so unrelated POST +/// handlers (e.g. `submit_attestations`) keep axum's default 2 MiB. +fn bounded_post(handler: H, body_limit: usize) -> MethodRouter where H: axum::handler::Handler, T: 'static, S: Clone + Send + Sync + 'static, { post(handler) - .route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) + .route_layer(DefaultBodyLimit::max(body_limit)) .route_layer(middleware::from_fn(enforce_json_content_type)) } @@ -258,18 +258,6 @@ async fn enforce_json_content_type(mut req: Request, next: Next) -> Result(handler: H) -> MethodRouter -where - H: axum::handler::Handler, - T: 'static, - S: Clone + Send + Sync + 'static, -{ - post(handler).route_layer(DefaultBodyLimit::max(SELECTIONS_BODY_LIMIT)) -} - /// Renders an axum query-extractor rejection as Pluto's standard /// [`ApiError`] body shape, so all 4xx responses from this router share the /// same `{ "code", "message" }` schema. @@ -333,8 +321,9 @@ async fn submit_exit() { async fn beacon_committee_selections( State(state): State>, - Json(selections): Json>, + selections: Result>, JsonRejection>, ) -> Result, ApiError> { + let Json(selections) = selections.map_err(json_rejection_to_api_error)?; let response = state .handler .beacon_committee_selections(selections) @@ -371,8 +360,9 @@ async fn submit_proposal_preparations() { async fn sync_committee_selections( State(state): State>, - Json(selections): Json>, + selections: Result>, JsonRejection>, ) -> Result, ApiError> { + let Json(selections) = selections.map_err(json_rejection_to_api_error)?; let response = state.handler.sync_committee_selections(selections).await?; Ok(Json(SyncCommitteeSelectionsResponse { @@ -793,7 +783,7 @@ mod tests { builder_enabled: false, }); - let Json(body) = beacon_committee_selections(State(state), Json(vec![])) + let Json(body) = beacon_committee_selections(State(state), Ok(Json(vec![]))) .await .unwrap(); @@ -826,7 +816,7 @@ mod tests { builder_enabled: false, }); - let Json(body) = sync_committee_selections(State(state), Json(vec![])) + let Json(body) = sync_committee_selections(State(state), Ok(Json(vec![]))) .await .unwrap(); @@ -885,4 +875,61 @@ mod tests { let resp = app.oneshot(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); } + + /// Malformed JSON on the selection POST routes is normalised into the + /// router's standard `{ code, message }` envelope rather than axum's + /// default plain-text 400 / 422 / 415 — matching Charon's `unmarshal` + /// which surfaces every body unmarshal failure as a uniform `400`. The + /// same plumbing covers the duties endpoints. + #[tokio::test] + async fn beacon_committee_selections_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/beacon_committee_selections") + .header("content-type", "application/json") + .body(Body::from(r#"{ "not": "an array" }"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } + + /// Counterpart of + /// [`beacon_committee_selections_returns_api_error_shape_on_malformed_body`]. + #[tokio::test] + async fn sync_committee_selections_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/sync_committee_selections") + .header("content-type", "application/json") + .body(Body::from("not-json-at-all")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } } From 8741a97fcb5641203560f59c2f0d6e5d452e4b4c Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 4 Jun 2026 00:42:44 +0200 Subject: [PATCH 08/11] refactor(core): tighten selections handlers + helper dedupe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Extract a `selections_deadline()` helper so the `Instant::checked_add(...).expect(...)` pattern lives in one place and the panic message tracks the constant rather than embedding a hard-coded duration string. * Switch `beacon_committee_selections` to call `selection.message_root()` instead of inlining `selection.slot.tree_hash_root().0` — matches the sync handler and centralises the SSZ selection-proof root in `crates/eth2api/src/v1.rs`. The `tree_hash::TreeHash` import is no longer needed. * Add `tracing::error!` to the two "hook not registered" branches in `fetch_active_validators` and the new `await_agg_sig_db_fn` check inside each selection handler. A handler running with a half-wired `Component` is exactly the kind of operational red flag the surrounding logs are designed to surface. * Bump the subscriber-`Err` and aggsigdb-`Err` log calls in the selection fan-out / await loops from `tracing::warn!` to `tracing::error!`. Timeouts stay at `warn` because they are transient peer-side issues; non-transient wiring failures belong at `error` so production alerting picks them up. * Destructure-by-copy in both per-slot loops (`for (&slot, set) in &psigs_by_slot`) so the `*slot` deref noise in eight `tracing` field assignments and four `SlotNumber::new(*slot)` calls collapses to plain `slot`. * `TestHandler::{beacon,sync}_committee_selections` switch from `.clone().expect(...)` to `match .as_ref()` so unstubbed call-sites panic with `unimplemented!` (matching every other stubbed method in the file) and the clone only happens on the `Some` path. --- crates/core/src/validatorapi/component.rs | 84 +++++++++++++---------- crates/core/src/validatorapi/testutils.rs | 16 ++--- 2 files changed, 54 insertions(+), 46 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index bead488d..8e7a3834 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -20,7 +20,6 @@ use pluto_eth2util::{ signing::{self, DomainName, SigningError}, }; use tokio::time::error::Elapsed; -use tree_hash::TreeHash; use super::{ error::ApiError, @@ -138,6 +137,18 @@ const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); /// the AggSigDB. const SELECTIONS_PHASE_TIMEOUT: Duration = Duration::from_secs(24); +/// Returns the absolute deadline for the selections fan-out + AggSigDB +/// phase, `SELECTIONS_PHASE_TIMEOUT` in the future. Factored out so both +/// selection handlers anchor the deadline identically and so the +/// `clippy::arithmetic_side_effects`-driven `checked_add(...).expect(...)` +/// pattern lives in one place (and tracks the constant rather than a +/// hard-coded string). +fn selections_deadline() -> tokio::time::Instant { + tokio::time::Instant::now() + .checked_add(SELECTIONS_PHASE_TIMEOUT) + .expect("Instant + SELECTIONS_PHASE_TIMEOUT does not overflow on a real clock") +} + /// Validator API [`Handler`] implementation. /// /// Holds the upstream beacon-node client and the cluster's public-key / @@ -729,12 +740,16 @@ impl Handler for Component { ); // The selection-proof signs the slot under - // `DOMAIN_SELECTION_PROOF`. + // `DOMAIN_SELECTION_PROOF`. `BeaconCommitteeSelection::message_root()` + // centralises this SSZ root computation (see + // `crates/eth2api/src/v1.rs`) — call it through the helper so the + // beacon and sync handlers share the same single source of truth + // for the per-variant signing root. self.verify_selection_partial_sig( &root_pubkey, DomainName::SelectionProof, selection.slot, - selection.slot.tree_hash_root().0, + selection.message_root(), &selection.selection_proof, "beacon committee selection", ) @@ -749,31 +764,26 @@ impl Handler for Component { // Bound the entire fan-out + AggSigDB phase with a single deadline // anchored here so total wall time scales with // `SELECTIONS_PHASE_TIMEOUT`, not with the number of selections. - let deadline = tokio::time::Instant::now() - .checked_add(SELECTIONS_PHASE_TIMEOUT) - .expect("Instant + 24s does not overflow on a real clock"); + let deadline = selections_deadline(); // Fanout every per-slot set to every subscriber. Subscribers receive // their own clone (the wrapper installed by `subscribe` clones the // set before each invocation). - for (slot, set) in &psigs_by_slot { - let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(slot)); for sub in &self.subs { tokio::time::timeout_at(deadline, sub(&duty, set)) .await .map_err(|_: Elapsed| { - tracing::warn!( - slot = *slot, - "beacon_committee_selections: subscriber timed out" - ); + tracing::warn!(slot, "beacon_committee_selections: subscriber timed out"); ApiError::new( StatusCode::REQUEST_TIMEOUT, "beacon committee selection subscriber timed out", ) })? .map_err(|err| { - tracing::warn!( - slot = *slot, + tracing::error!( + slot, error = %err, "beacon_committee_selections: subscriber failed" ); @@ -788,6 +798,9 @@ impl Handler for Component { // Pull every aggregated selection back out of the AggSigDB. let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + tracing::error!( + "beacon_committee_selections: await_agg_sig_db hook not registered — Component is half-wired" + ); ApiError::new( StatusCode::INTERNAL_SERVER_ERROR, "await_agg_sig_db hook not registered", @@ -795,14 +808,14 @@ impl Handler for Component { })?; let mut resp: Vec = Vec::with_capacity(selections.len()); - for (slot, set) in &psigs_by_slot { - let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(*slot)); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(slot)); for pk in set.inner().keys() { let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) .await .map_err(|_: Elapsed| { tracing::warn!( - slot = *slot, + slot, "beacon_committee_selections: aggsigdb await timed out" ); ApiError::new( @@ -811,8 +824,8 @@ impl Handler for Component { ) })? .map_err(|err| { - tracing::warn!( - slot = *slot, + tracing::error!( + slot, error = %err, "beacon_committee_selections: aggsigdb lookup failed" ); @@ -880,28 +893,23 @@ impl Handler for Component { // See `beacon_committee_selections` — bound the whole fan-out + // AggSigDB phase against a single deadline so the per-selection // count does not multiply the wall-time budget. - let deadline = tokio::time::Instant::now() - .checked_add(SELECTIONS_PHASE_TIMEOUT) - .expect("Instant + 24s does not overflow on a real clock"); + let deadline = selections_deadline(); - for (slot, set) in &psigs_by_slot { - let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(slot)); for sub in &self.subs { tokio::time::timeout_at(deadline, sub(&duty, set)) .await .map_err(|_: Elapsed| { - tracing::warn!( - slot = *slot, - "sync_committee_selections: subscriber timed out" - ); + tracing::warn!(slot, "sync_committee_selections: subscriber timed out"); ApiError::new( StatusCode::REQUEST_TIMEOUT, "sync committee selection subscriber timed out", ) })? .map_err(|err| { - tracing::warn!( - slot = *slot, + tracing::error!( + slot, error = %err, "sync_committee_selections: subscriber failed" ); @@ -915,6 +923,9 @@ impl Handler for Component { } let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + tracing::error!( + "sync_committee_selections: await_agg_sig_db hook not registered — Component is half-wired" + ); ApiError::new( StatusCode::INTERNAL_SERVER_ERROR, "await_agg_sig_db hook not registered", @@ -922,24 +933,21 @@ impl Handler for Component { })?; let mut resp: Vec = Vec::with_capacity(selections.len()); - for (slot, set) in &psigs_by_slot { - let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(*slot)); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(slot)); for pk in set.inner().keys() { let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) .await .map_err(|_: Elapsed| { - tracing::warn!( - slot = *slot, - "sync_committee_selections: aggsigdb await timed out" - ); + tracing::warn!(slot, "sync_committee_selections: aggsigdb await timed out"); ApiError::new( StatusCode::REQUEST_TIMEOUT, "aggregated sync committee selection not available before deadline", ) })? .map_err(|err| { - tracing::warn!( - slot = *slot, + tracing::error!( + slot, error = %err, "sync_committee_selections: aggsigdb lookup failed" ); diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index cae049d7..0ed2551f 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -185,20 +185,20 @@ impl Handler for TestHandler { &self, _selections: Vec, ) -> Result>, ApiError> { - Ok(self - .beacon_committee_selections_response - .clone() - .expect("beacon_committee_selections not stubbed in TestHandler")) + match self.beacon_committee_selections_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("beacon_committee_selections not stubbed in TestHandler"), + } } async fn sync_committee_selections( &self, _selections: Vec, ) -> Result>, ApiError> { - Ok(self - .sync_committee_selections_response - .clone() - .expect("sync_committee_selections not stubbed in TestHandler")) + match self.sync_committee_selections_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("sync_committee_selections not stubbed in TestHandler"), + } } async fn validators( From 127df9276a72de68de218c6f42f3659637697b7c Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 4 Jun 2026 11:52:44 +0200 Subject: [PATCH 09/11] style(core): finish iter2 cleanups in validatorapi MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reorder `component.rs` imports back to the workspace `std` → external → `super`/`crate` grouping. Iter2's helper changes had left the `super::` block sitting between `std` and the external crates, drifting from every other file in `validatorapi/` and the sibling core modules. * Apply the iter2 `match self.X.as_ref() { Some(r) => Ok(r.clone()), None => unimplemented!(...) }` stub shape to the remaining four `Option`-based `TestHandler` methods (attester_duties, proposer_duties, sync_committee_duties, attestation_data) so the whole file uses one style instead of two. * Add `attester_duties_returns_api_error_shape_on_malformed_body` — the duties endpoints share the same `json_rejection_to_api_error` plumbing as the selection routes, and the iter2 envelope tests only locked the selection side. This test would now fail if a future refactor accidentally re-introduced bare `Json` extraction on a duties handler. The selection envelope test docstring is updated to point at the new duties counterpart. --- crates/core/src/validatorapi/router.rs | 34 ++++++++++++++++++++++- crates/core/src/validatorapi/testutils.rs | 32 ++++++++++----------- 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 138080af..90b2b499 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -880,7 +880,9 @@ mod tests { /// router's standard `{ code, message }` envelope rather than axum's /// default plain-text 400 / 422 / 415 — matching Charon's `unmarshal` /// which surfaces every body unmarshal failure as a uniform `400`. The - /// same plumbing covers the duties endpoints. + /// same plumbing covers the duties endpoints; see + /// [`attester_duties_returns_api_error_shape_on_malformed_body`] for the + /// duties variant. #[tokio::test] async fn beacon_committee_selections_returns_api_error_shape_on_malformed_body() { use axum::{ @@ -932,4 +934,34 @@ mod tests { assert_eq!(json["code"], 400); assert!(json["message"].is_string()); } + + /// Duties POST endpoints share the same `json_rejection_to_api_error` + /// plumbing as the selection routes — this test locks the envelope + /// contract on the duties side so a future refactor that re-introduces + /// bare `Json` extraction is caught by a failing test rather + /// than only by manual review. + #[tokio::test] + async fn attester_duties_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .body(Body::from(r#"{ "not": "an array" }"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 0ed2551f..16b78e2c 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -106,40 +106,40 @@ impl Handler for TestHandler { &self, _opts: AttesterDutiesOpts, ) -> Result { - Ok(self - .attester_duties_response - .clone() - .expect("attester_duties not stubbed in TestHandler")) + match self.attester_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("attester_duties not stubbed in TestHandler"), + } } async fn proposer_duties( &self, _opts: ProposerDutiesOpts, ) -> Result { - Ok(self - .proposer_duties_response - .clone() - .expect("proposer_duties not stubbed in TestHandler")) + match self.proposer_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("proposer_duties not stubbed in TestHandler"), + } } async fn sync_committee_duties( &self, _opts: SyncCommitteeDutiesOpts, ) -> Result { - Ok(self - .sync_committee_duties_response - .clone() - .expect("sync_committee_duties not stubbed in TestHandler")) + match self.sync_committee_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("sync_committee_duties not stubbed in TestHandler"), + } } async fn attestation_data( &self, _opts: AttestationDataOpts, ) -> Result { - Ok(self - .attestation_data_response - .clone() - .expect("attestation_data not stubbed in TestHandler")) + match self.attestation_data_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("attestation_data not stubbed in TestHandler"), + } } async fn submit_attestations( From f9c968c5b5ca7891ae2712ac2e563a4e88538475 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 11 Jun 2026 20:07:29 +0200 Subject: [PATCH 10/11] feat(core): wire eth2api CachedValidatorsProvider into validatorapi Make eth2api's existing CachedValidatorsProvider trait async (#[async_trait]) and implement it for ValidatorCache, add ActiveValidators::new so consumers can build instances, and add the async-trait dependency. Wire a validator_cache: Arc into the validatorapi Component (constructors + fetch_active_validators), reusing the shared trait rather than a validatorapi-local duplicate. fetch_active_validators mirrors Go's eth2Cl.ActiveValidators. Add a TestValidatorCache test double and update existing validatorapi tests for the new constructor arity. This is the base the validatorapi submit-handler PRs (selections, exit/ registrations, sync-submit) stack on. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- Cargo.lock | 1 + crates/core/src/validatorapi/component.rs | 87 +++++++++++++++++++++-- crates/eth2api/Cargo.toml | 1 + crates/eth2api/src/valcache.rs | 31 +++++++- 4 files changed, 110 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be2ddc88..abd7cd39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5707,6 +5707,7 @@ version = "1.7.1" dependencies = [ "alloy", "anyhow", + "async-trait", "bon", "chrono", "ethereum_ssz", diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index f9f7729e..60a3c4e3 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -14,6 +14,7 @@ use pluto_eth2api::{ GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, + valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; @@ -140,6 +141,12 @@ const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(24); pub struct Component { /// Upstream beacon-node API client. eth2_cl: Arc, + /// Per-epoch active-validators cache. Submit handlers consult this to + /// translate a validator-client-supplied `validator_index` into the + /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, + /// which is itself backed by the beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation /// data) produced by the rest of the pipeline. dutydb: Arc, @@ -196,6 +203,7 @@ impl Component { share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -203,6 +211,7 @@ impl Component { share_idx, pub_share_by_pubkey, builder_enabled, + validator_cache, insecure_test: false, subs: Vec::new(), await_proposal_fn: None, @@ -223,6 +232,7 @@ impl Component { eth2_cl: Arc, dutydb: Arc, share_idx: u64, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -230,6 +240,7 @@ impl Component { share_idx, pub_share_by_pubkey: HashMap::new(), builder_enabled: false, + validator_cache, insecure_test: true, subs: Vec::new(), await_proposal_fn: None, @@ -241,6 +252,25 @@ impl Component { } } + /// Returns the cluster's active validators (`validator_index -> DV root + /// public key`) from the registered [`CachedValidatorsProvider`], + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the + /// beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + async fn fetch_active_validators(&self) -> Result { + tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.validator_cache.active_validators(), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("active validators"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed") + .with_source(err) + }) + } + /// Appends a subscriber that is invoked by submit endpoints once a /// partial-signed-data set has been validated. The registered closure /// receives its own clone of the set, so subscribers can mutate without @@ -1321,6 +1351,39 @@ mod tests { types::{Duty, DutyDefinition, DutyType, PubKey, SlotNumber}, validatorapi::types::AttestationDataOpts, }; + use pluto_eth2api::valcache::{CompleteValidators, ValidatorCacheError}; + + /// In-memory [`CachedValidatorsProvider`] for tests. Holds a fixed + /// `validator_index -> DV root pubkey` map. `complete_validators` is not + /// consumed by the validator API, so it returns an empty set. + #[derive(Default)] + pub(super) struct TestValidatorCache(HashMap); + + impl TestValidatorCache { + /// An empty cache as an `Arc`. + pub(super) fn empty() -> Arc { + Arc::new(Self::default()) + } + + /// A cache pre-populated with `validators`. + #[allow(dead_code, reason = "consumed by submit_* handler tests in later PRs")] + pub(super) fn arc( + validators: HashMap, + ) -> Arc { + Arc::new(Self(validators)) + } + } + + #[async_trait] + impl CachedValidatorsProvider for TestValidatorCache { + async fn active_validators(&self) -> Result { + Ok(ActiveValidators::new(self.0.clone())) + } + + async fn complete_validators(&self) -> Result { + Ok(CompleteValidators::default()) + } + } /// Schedules every duty with a deadline at `MAX_UTC`, so duties are /// `Scheduled` but never naturally expire. @@ -1345,7 +1408,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, dutydb) } @@ -1587,7 +1651,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); // Start an await before any data is stored. let waiter = { @@ -1773,7 +1838,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - Component::new(eth2_cl, dutydb, 1, map, false) + Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()) } /// `Subscribe` invokes every registered subscriber, each receiving its @@ -2004,7 +2069,7 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new(eth2_cl, dutydb, 1, map, false); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); (component, mock) } @@ -2087,7 +2152,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, dutydb, 1); + let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty()); component .verify_partial_sig( @@ -2163,7 +2228,8 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, mock) } @@ -2812,7 +2878,14 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let mut component = Component::new(eth2_cl, Arc::clone(&dutydb), 1, HashMap::new(), false); + let mut component = Component::new( + eth2_cl, + Arc::clone(&dutydb), + 1, + HashMap::new(), + false, + TestValidatorCache::empty(), + ); let (consensus, signed) = matched_phase0_proposals(33, 5); diff --git a/crates/eth2api/Cargo.toml b/crates/eth2api/Cargo.toml index 99b0a3c9..fbc5c298 100644 --- a/crates/eth2api/Cargo.toml +++ b/crates/eth2api/Cargo.toml @@ -13,6 +13,7 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"] [dependencies] anyhow.workspace = true +async-trait.workspace = true bon.workspace = true http.workspace = true oas3-gen-support.workspace = true diff --git a/crates/eth2api/src/valcache.rs b/crates/eth2api/src/valcache.rs index ea52ef6d..6e086680 100644 --- a/crates/eth2api/src/valcache.rs +++ b/crates/eth2api/src/valcache.rs @@ -4,6 +4,7 @@ use crate::{ PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, spec::phase0::{BLSPubKey as PubKey, ValidatorIndex}, }; +use async_trait::async_trait; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; @@ -42,6 +43,13 @@ impl std::ops::Deref for CompleteValidators { } impl ActiveValidators { + /// Builds an [`ActiveValidators`] from a `validator_index -> pubkey` map. + /// Lets consumers outside this crate (e.g. test doubles of + /// [`CachedValidatorsProvider`]) construct populated instances. + pub fn new(validators: HashMap) -> Self { + Self(validators) + } + /// An [`Iterator`] of active validator indices. pub fn indices(&self) -> impl Iterator + '_ { self.0.keys().copied() @@ -55,12 +63,29 @@ impl ActiveValidators { /// A provider of cached validator information for the current epoch, /// including both active validators and complete validator data. -pub trait CachedValidatorsProvider { +/// +/// Async so implementations may populate the underlying cache on demand — +/// callers must not assume the call is non-blocking. Consumed via +/// `Arc` (e.g. by the validator API), so the +/// trait is object-safe and `Send + Sync`. +#[async_trait] +pub trait CachedValidatorsProvider: Send + Sync { /// Get the cached active validators. - fn active_validators(&self) -> Result; + async fn active_validators(&self) -> Result; /// Get all the cached validators. - fn complete_validators(&self) -> Result; + async fn complete_validators(&self) -> Result; +} + +#[async_trait] +impl CachedValidatorsProvider for ValidatorCache { + async fn active_validators(&self) -> Result { + Ok(self.get_by_head().await?.0) + } + + async fn complete_validators(&self) -> Result { + Ok(self.get_by_head().await?.1) + } } /// A cache for active validators. From 860004f7fc06eec28072fef29936fd633bca1009 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:10:51 +0200 Subject: [PATCH 11/11] docs(core): drop Go/Charon cross-reference comments from selections handlers Remove "port of"/"mirrors Go's"/"matches Charon" annotations and similar AI-flavored narration from the validatorapi selections code and tests, keeping the behavioural descriptions. Co-authored-by: varex83 --- crates/core/src/validatorapi/component.rs | 64 ++++++++--------------- crates/core/src/validatorapi/router.rs | 49 ++++++++--------- crates/core/src/validatorapi/types.rs | 17 +++--- 3 files changed, 51 insertions(+), 79 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 7a9d88e8..8540d5b4 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -132,11 +132,10 @@ const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); /// Hard deadline for the *whole* fan-out + AggSigDB phase of the selection /// handlers — the per-slot subscriber calls plus every blocking /// `await_agg_sig_db_fn` lookup share a single budget anchored at this -/// duration after the phase begins. The Go reference relies on the request -/// context timeout; here we set an explicit bound so neither a stalled -/// subscriber nor a stalled AggSigDB can pin a selections request forever, -/// and the worst-case wall time scales with the duration rather than with -/// the per-selection count. Sized to roughly two slots so a real +/// duration after the phase begins. An explicit bound ensures neither a +/// stalled subscriber nor a stalled AggSigDB can pin a selections request +/// forever, and the worst-case wall time scales with the duration rather than +/// with the per-selection count. Sized to roughly two slots so a real /// `PrepareAggregator` / `PrepareSyncContribution` duty has time to reach /// the AggSigDB. const SELECTIONS_PHASE_TIMEOUT: Duration = Duration::from_secs(24); @@ -173,8 +172,7 @@ pub struct Component { eth2_cl: Arc, /// Per-epoch active-validators cache. Submit handlers consult this to /// translate a validator-client-supplied `validator_index` into the - /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, - /// which is itself backed by the beacon-node validator cache. + /// cluster's DV root public key. Backed by the beacon-node validator cache. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation @@ -284,9 +282,7 @@ impl Component { /// Returns the cluster's active validators (`validator_index -> DV root /// public key`) from the registered [`CachedValidatorsProvider`], - /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's - /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the - /// beacon-node validator cache. + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators(&self) -> Result { tokio::time::timeout( @@ -489,10 +485,9 @@ impl Component { } /// Looks up the DV root pubkey for a selection's `validator_index`. - /// Mirrors Go's `vals[selection.ValidatorIndex]` / `core.PubKeyFromBytes` - /// pair at the top of each selection loop. Returns both representations - /// the handler needs: the `BLSPubKey` for signature verification and the - /// `core::PubKey` for use as a `ParSignedDataSet` key. + /// Returns both representations the handler needs: the `BLSPubKey` for + /// signature verification and the `core::PubKey` for use as a + /// `ParSignedDataSet` key. fn resolve_validator( &self, validator_index: ValidatorIndex, @@ -500,8 +495,7 @@ impl Component { endpoint: &'static str, ) -> Result<(BLSPubKey, PubKey), ApiError> { let root = active_validators.get(&validator_index).ok_or_else(|| { - // Mirrors Go's `errors.New("validator not found")` branch — the - // caller asked us to sign for a validator that is not part of + // The caller asked us to sign for a validator that is not part of // the cluster. 400 (not 502): the failure is request-level, not // gateway-level. ApiError::new( @@ -514,9 +508,7 @@ impl Component { /// Verifies a selection's partial signature. Bundles slot → epoch /// resolution alongside the underlying `verify_partial_sig` call and - /// surfaces the failure as a 400 with a generic message (matching the - /// Go path where `verifyPartialSig` returns an error that propagates - /// out of the handler). + /// surfaces the failure as a 400 with a generic message. async fn verify_selection_partial_sig( &self, root_pubkey: &BLSPubKey, @@ -958,13 +950,10 @@ impl Handler for Component { &self, selections: Vec, ) -> Result>, ApiError> { - // Port of `BeaconCommitteeSelections` in - // `core/validatorapi/validatorapi.go` (lines 798–864). let active_validators = self.fetch_active_validators().await?; - // psigs_by_slot mirrors Go's `psigsBySlot[slot]ParSignedDataSet` — - // keyed by `(slot, pubkey)` so the per-slot fanout below produces one - // `PrepareAggregator` duty per slot covering every selection from + // psigs_by_slot is keyed by slot so the per-slot fanout below produces + // one `PrepareAggregator` duty per slot covering every selection from // that slot. let mut psigs_by_slot: HashMap = HashMap::new(); for selection in &selections { @@ -1094,8 +1083,6 @@ impl Handler for Component { &self, selections: Vec, ) -> Result>, ApiError> { - // Port of `SyncCommitteeSelections` in - // `core/validatorapi/validatorapi.go` (lines 1072–1138). let active_validators = self.fetch_active_validators().await?; let mut psigs_by_slot: HashMap = HashMap::new(); @@ -1111,10 +1098,8 @@ impl Handler for Component { // Sync committee selection proofs sign over a // `SyncAggregatorSelectionData` root under - // `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`. The Rust selection - // wrapper's `message_root()` already computes this exactly the - // way Go's `SyncCommitteeSelection.MessageRoot()` does — see - // `crates/eth2api/src/v1.rs`. + // `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`. The selection wrapper's + // `message_root()` computes this — see `crates/eth2api/src/v1.rs`. self.verify_selection_partial_sig( &root_pubkey, DomainName::SyncCommitteeSelectionProof, @@ -1401,10 +1386,9 @@ fn swap_sync_committee_pubshares( } /// Downcasts the aggregated signed data from the AggSigDB to a -/// `BeaconCommitteeSelection`. Mirrors Go's -/// `s.(core.BeaconCommitteeSelection)` type assertion. A mismatch indicates -/// a wiring bug — the cluster stored the wrong duty type under the -/// `PrepareAggregator` duty — so it surfaces as 500 rather than 4xx. +/// `BeaconCommitteeSelection`. A mismatch indicates a wiring bug — the cluster +/// stored the wrong duty type under the `PrepareAggregator` duty — so it +/// surfaces as 500 rather than 4xx. fn downcast_beacon_committee_selection( signed: &dyn SignedData, ) -> Result<&signeddata::BeaconCommitteeSelection, ApiError> { @@ -2541,8 +2525,7 @@ mod tests { // ==================================================================== /// `fetch_active_validators` returns whatever the registered - /// `CachedValidatorsProvider` yields, untouched. Mirrors Go's - /// `c.eth2Cl.ActiveValidators(ctx)` return shape. + /// `CachedValidatorsProvider` yields, untouched. #[tokio::test] async fn fetch_active_validators_returns_cache_contents() { let cancel = CancellationToken::new(); @@ -2693,8 +2676,7 @@ mod tests { } /// Happy-path beacon committee selections: one selection in, one - /// aggregated selection out. Mirrors the per-validator-index flow in - /// Go's `BeaconCommitteeSelections` at `validatorapi.go:798-864`. + /// aggregated selection out. #[tokio::test] async fn beacon_committee_selections_happy_path() { const SLOT: Slot = 12; @@ -2822,8 +2804,7 @@ mod tests { } /// A selection whose validator index is not part of the cluster's - /// active set fails the lookup short-circuit with `400 Bad Request`, - /// mirroring Go's `errors.New("validator not found")` branch. + /// active set fails the lookup short-circuit with `400 Bad Request`. #[tokio::test] async fn beacon_committee_selections_rejects_unknown_validator_index() { let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; @@ -3149,8 +3130,7 @@ mod tests { /// An empty selections array is a well-defined no-op: the handler runs /// the active-validators lookup, finds nothing to fan out, never queries - /// the AggSigDB, and returns an empty `data` array. Mirrors Go's - /// behaviour where `wrapResponse(nil)` returns `data: []`. + /// the AggSigDB, and returns an empty `data` array. #[tokio::test] async fn beacon_committee_selections_empty_input_returns_empty_data() { let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index 90b2b499..a7a72528 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -215,9 +215,9 @@ async fn attestation_data( Ok(Json(response)) } -/// Wraps a `POST` handler with a body-size cap and the Charon-parity -/// content-type policy. The cap is local to the route so unrelated POST -/// handlers (e.g. `submit_attestations`) keep axum's default 2 MiB. +/// Wraps a `POST` handler with a body-size cap and the JSON content-type +/// policy. The cap is local to the route so unrelated POST handlers (e.g. +/// `submit_attestations`) keep axum's default 2 MiB. fn bounded_post(handler: H, body_limit: usize) -> MethodRouter where H: axum::handler::Handler, @@ -229,14 +229,14 @@ where .route_layer(middleware::from_fn(enforce_json_content_type)) } -/// Matches Charon's content-type handling at `core/validatorapi/router.go:365`: -/// a missing `Content-Type` is treated as `application/json`; an unrecognized -/// content type is rejected with `415 Unsupported Media Type`. SSZ is not -/// supported yet — when it lands, this is the right seam to extend. +/// Content-type handling: a missing `Content-Type` is treated as +/// `application/json`; an unrecognized content type is rejected with `415 +/// Unsupported Media Type`. SSZ is not supported yet — when it lands, this is +/// the right seam to extend. /// /// Without this layer, axum's `Json` extractor would reject a missing header -/// with `MissingJsonContentType`, which our envelope normalises to `400` — -/// diverging from Charon, which lets VCs that don't set the header through. +/// with `MissingJsonContentType`, which our envelope normalises to `400`. We +/// instead let VCs that don't set the header through. async fn enforce_json_content_type(mut req: Request, next: Next) -> Result { match req.headers().get(header::CONTENT_TYPE) { None => { @@ -271,8 +271,8 @@ fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { /// instead of axum's default plain-text response. /// /// Genuine parse failures — malformed JSON (`400`) and wrong element type -/// (`422`) — are normalised to a uniform `400`, matching Charon's `unmarshal`, -/// which returns `400` for all body unmarshal failures. Content-Type rejections +/// (`422`) — are normalised to a uniform `400` for all body unmarshal +/// failures. Content-Type rejections /// no longer reach this function: [`enforce_json_content_type`] intercepts /// them upstream so missing/JSON requests pass through and non-JSON requests /// return `415`. The body-size-limit rejection from [`DefaultBodyLimit`] @@ -633,8 +633,7 @@ mod tests { /// A malformed duties body emits the same `{ code, message }` envelope and /// uniform 400 as the rest of the router, rather than axum's default /// plain-text rejection (which would be 400 for a syntax error but 422 for - /// a type error). Mirrors Charon's `unmarshal`, which returns 400 for every - /// body parse failure. + /// a type error). #[tokio::test] async fn attester_duties_returns_api_error_shape_on_bad_body() { use axum::{ @@ -661,11 +660,10 @@ mod tests { assert!(json["message"].is_string()); } - /// Charon-parity: a duties request that omits `Content-Type` is - /// treated as `application/json` rather than rejected — the + /// A duties request that omits `Content-Type` is treated as + /// `application/json` rather than rejected — the /// `enforce_json_content_type` middleware injects the header before - /// the `Json` extractor sees the request. See `core/validatorapi/ - /// router.go:365` (`if contentHeader == "" || ...`). + /// the `Json` extractor sees the request. #[tokio::test] async fn attester_duties_accepts_missing_content_type() { use axum::{ @@ -691,9 +689,9 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); } - /// Charon-parity: a duties request with a non-JSON `Content-Type` - /// returns `415 Unsupported Media Type`, not the `400` that the - /// generic body-parse normaliser would produce. + /// A duties request with a non-JSON `Content-Type` returns `415 + /// Unsupported Media Type`, not the `400` that the generic body-parse + /// normaliser would produce. #[tokio::test] async fn attester_duties_rejects_non_json_content_type() { use axum::{ @@ -761,10 +759,9 @@ mod tests { } /// Verifies the router wraps the `Handler::beacon_committee_selections` - /// payload into the `{ "data": [...] }` shape Charon's wire format uses - /// (`beaconCommitteeSelectionsJSON` in `core/validatorapi/router.go`), - /// dropping the `execution_optimistic` / `finalized` / `dependent_root` - /// metadata that the trait method carries internally. + /// payload into the `{ "data": [...] }` wire shape, dropping the + /// `execution_optimistic` / `finalized` / `dependent_root` metadata that + /// the trait method carries internally. #[tokio::test] async fn beacon_committee_selections_wraps_handler_value() { let selection = BeaconCommitteeSelection { @@ -878,8 +875,8 @@ mod tests { /// Malformed JSON on the selection POST routes is normalised into the /// router's standard `{ code, message }` envelope rather than axum's - /// default plain-text 400 / 422 / 415 — matching Charon's `unmarshal` - /// which surfaces every body unmarshal failure as a uniform `400`. The + /// default plain-text 400 / 422 / 415 — every body unmarshal failure + /// surfaces as a uniform `400`. The /// same plumbing covers the duties endpoints; see /// [`attester_duties_returns_api_error_shape_on_malformed_body`] for the /// duties variant. diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 4f208e8a..d35c844d 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -29,14 +29,12 @@ pub use pluto_eth2api::{ }; /// Beacon-committee selection payload. Aliases the consensus-spec -/// `v1::BeaconCommitteeSelection` so the handler can operate on the same -/// validator-index / slot / selection-proof tuple Go uses at -/// `core/validatorapi/validatorapi.go:798`. +/// `v1::BeaconCommitteeSelection` so the handler can operate on the +/// validator-index / slot / selection-proof tuple directly. pub type BeaconCommitteeSelection = V1BeaconCommitteeSelection; /// Sync-committee selection payload. Aliases the consensus-spec -/// `v1::SyncCommitteeSelection`, matching Go's -/// `core/validatorapi/validatorapi.go:1072` input shape. +/// `v1::SyncCommitteeSelection`. pub type SyncCommitteeSelection = V1SyncCommitteeSelection; /// Attestation data alias for the consensus-spec phase0 type. @@ -154,8 +152,7 @@ pub struct AttestationDataResponse { pub data: AttestationData, } -/// Response envelope for the `beacon_committee_selections` endpoint. -/// Matches Charon's `beaconCommitteeSelectionsJSON` wire shape — a `data` +/// Response envelope for the `beacon_committee_selections` endpoint — a `data` /// array of aggregated selection proofs. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BeaconCommitteeSelectionsResponse { @@ -163,8 +160,7 @@ pub struct BeaconCommitteeSelectionsResponse { pub data: Vec, } -/// Response envelope for the `sync_committee_selections` endpoint. -/// Matches Charon's `syncCommitteeSelectionsJSON` wire shape — a `data` +/// Response envelope for the `sync_committee_selections` endpoint — a `data` /// array of aggregated selection proofs. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncCommitteeSelectionsResponse { @@ -183,8 +179,7 @@ pub use crate::signeddata::VersionedProposal; pub use crate::signeddata::VersionedSignedProposal; /// Versioned signed blinded proposal payload — alias of the eth2api versioned -/// wrapper, the same shape consumed by Go's -/// `SubmitBlindedProposalOpts.Proposal`. +/// wrapper. pub use pluto_eth2api::versioned::VersionedSignedBlindedProposal; /// Versioned attestation payload. Placeholder.