diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 1f9c099e..8540d5b4 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -12,11 +12,14 @@ use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, - spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, + spec::phase0::{BLSPubKey, Epoch, Root, Slot, ValidatorIndex}, valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, }; -use pluto_eth2util::signing::{self, DomainName, SigningError}; +use pluto_eth2util::{ + helpers::epoch_from_slot, + signing::{self, DomainName, SigningError}, +}; use tokio::time::error::Elapsed; use tracing::{debug, instrument}; @@ -37,6 +40,7 @@ use super::{ }; use crate::{ dutydb::{Error as DutyDbError, MemDB}, + signeddata, signeddata::{ SignedDataError, SignedRandao, SyncContribution, VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, @@ -125,6 +129,29 @@ const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// attestation duty has time to flow through the pipeline. const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); +/// Hard deadline for the *whole* fan-out + AggSigDB phase of the selection +/// handlers — the per-slot subscriber calls plus every blocking +/// `await_agg_sig_db_fn` lookup share a single budget anchored at this +/// duration after the phase begins. An explicit bound ensures neither a +/// stalled subscriber nor a stalled AggSigDB can pin a selections request +/// forever, and the worst-case wall time scales with the duration rather than +/// with the per-selection count. Sized to roughly two slots so a real +/// `PrepareAggregator` / `PrepareSyncContribution` duty has time to reach +/// the AggSigDB. +const SELECTIONS_PHASE_TIMEOUT: Duration = Duration::from_secs(24); + +/// Returns the absolute deadline for the selections fan-out + AggSigDB +/// phase, `SELECTIONS_PHASE_TIMEOUT` in the future. Factored out so both +/// selection handlers anchor the deadline identically and so the +/// `clippy::arithmetic_side_effects`-driven `checked_add(...).expect(...)` +/// pattern lives in one place (and tracks the constant rather than a +/// hard-coded string). +fn selections_deadline() -> tokio::time::Instant { + tokio::time::Instant::now() + .checked_add(SELECTIONS_PHASE_TIMEOUT) + .expect("Instant + SELECTIONS_PHASE_TIMEOUT does not overflow on a real clock") +} + /// Hard deadline for the whole `proposal` / `submit_proposal` / /// `submit_blinded_proposal` handler body. Bounds every leg — proposer /// pubkey lookup, `epoch_from_slot`, partial-sig verification (which itself @@ -145,8 +172,7 @@ pub struct Component { eth2_cl: Arc, /// Per-epoch active-validators cache. Submit handlers consult this to /// translate a validator-client-supplied `validator_index` into the - /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, - /// which is itself backed by the beacon-node validator cache. + /// cluster's DV root public key. Backed by the beacon-node validator cache. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation @@ -256,9 +282,7 @@ impl Component { /// Returns the cluster's active validators (`validator_index -> DV root /// public key`) from the registered [`CachedValidatorsProvider`], - /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's - /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the - /// beacon-node validator cache. + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators(&self) -> Result { tokio::time::timeout( @@ -459,6 +483,65 @@ impl Component { Ok(()) } + + /// Looks up the DV root pubkey for a selection's `validator_index`. + /// Returns both representations the handler needs: the `BLSPubKey` for + /// signature verification and the `core::PubKey` for use as a + /// `ParSignedDataSet` key. + fn resolve_validator( + &self, + validator_index: ValidatorIndex, + active_validators: &HashMap, + endpoint: &'static str, + ) -> Result<(BLSPubKey, PubKey), ApiError> { + let root = active_validators.get(&validator_index).ok_or_else(|| { + // The caller asked us to sign for a validator that is not part of + // the cluster. 400 (not 502): the failure is request-level, not + // gateway-level. + ApiError::new( + StatusCode::BAD_REQUEST, + format!("{endpoint}: validator not found"), + ) + })?; + Ok((*root, PubKey::new(*root))) + } + + /// Verifies a selection's partial signature. Bundles slot → epoch + /// resolution alongside the underlying `verify_partial_sig` call and + /// surfaces the failure as a 400 with a generic message. + async fn verify_selection_partial_sig( + &self, + root_pubkey: &BLSPubKey, + domain: DomainName, + slot: Slot, + message_root: Root, + signature: &Signature, + endpoint: &'static str, + ) -> Result<(), ApiError> { + // Resolve the epoch first so a misconfigured upstream surfaces as + // 502 rather than as a verification failure. + let epoch = epoch_from_slot(&self.eth2_cl, slot).await.map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + format!("{endpoint}: epoch lookup failed"), + ) + .with_source(err) + })?; + + self.verify_partial_sig(root_pubkey, domain, epoch, message_root, signature) + .await + .map_err(|err| match err { + VerifyPartialSigError::UnknownPubKey => ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + format!("{endpoint}: unknown validator public key"), + ), + VerifyPartialSigError::Signing(inner) => ApiError::new( + StatusCode::BAD_REQUEST, + format!("{endpoint}: invalid partial signature"), + ) + .with_source(inner), + }) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -865,17 +948,253 @@ impl Handler for Component { #[instrument(skip_all)] async fn beacon_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("beacon_committee_selections not yet ported") + let active_validators = self.fetch_active_validators().await?; + + // psigs_by_slot is keyed by slot so the per-slot fanout below produces + // one `PrepareAggregator` duty per slot covering every selection from + // that slot. + let mut psigs_by_slot: HashMap = HashMap::new(); + for selection in &selections { + let (root_pubkey, core_pubkey) = self.resolve_validator( + selection.validator_index, + &active_validators, + "beacon committee selection", + )?; + + let par_sig = signeddata::BeaconCommitteeSelection::new_partial( + selection.clone(), + self.share_idx, + ); + + // The selection-proof signs the slot under + // `DOMAIN_SELECTION_PROOF`. `BeaconCommitteeSelection::message_root()` + // centralises this SSZ root computation (see + // `crates/eth2api/src/v1.rs`) — call it through the helper so the + // beacon and sync handlers share the same single source of truth + // for the per-variant signing root. + self.verify_selection_partial_sig( + &root_pubkey, + DomainName::SelectionProof, + selection.slot, + selection.message_root(), + &selection.selection_proof, + "beacon committee selection", + ) + .await?; + + psigs_by_slot + .entry(selection.slot) + .or_default() + .insert(core_pubkey, par_sig); + } + + // Bound the entire fan-out + AggSigDB phase with a single deadline + // anchored here so total wall time scales with + // `SELECTIONS_PHASE_TIMEOUT`, not with the number of selections. + let deadline = selections_deadline(); + + // Fanout every per-slot set to every subscriber. Subscribers receive + // their own clone (the wrapper installed by `subscribe` clones the + // set before each invocation). + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(slot)); + for sub in &self.subs { + tokio::time::timeout_at(deadline, sub(&duty, set)) + .await + .map_err(|_: Elapsed| { + tracing::warn!(slot, "beacon_committee_selections: subscriber timed out"); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "beacon committee selection subscriber timed out", + ) + })? + .map_err(|err| { + tracing::error!( + slot, + error = %err, + "beacon_committee_selections: subscriber failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "beacon committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; + } + } + + // Pull every aggregated selection back out of the AggSigDB. + let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + tracing::error!( + "beacon_committee_selections: await_agg_sig_db hook not registered — Component is half-wired" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "await_agg_sig_db hook not registered", + ) + })?; + + let mut resp: Vec = Vec::with_capacity(selections.len()); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_aggregator_duty(SlotNumber::new(slot)); + for pk in set.inner().keys() { + let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) + .await + .map_err(|_: Elapsed| { + tracing::warn!( + slot, + "beacon_committee_selections: aggsigdb await timed out" + ); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated beacon committee selection not available before deadline", + ) + })? + .map_err(|err| { + tracing::error!( + slot, + error = %err, + "beacon_committee_selections: aggsigdb lookup failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated beacon committee selection lookup failed", + ) + .with_boxed_source(err) + })?; + + let selection = downcast_beacon_committee_selection(signed.as_ref())?; + resp.push(selection.0.clone()); + } + } + + Ok(EthResponse { + data: resp, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } #[instrument(skip_all)] async fn sync_committee_selections( &self, - _selections: Vec, + selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not yet ported") + let active_validators = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for selection in &selections { + let (root_pubkey, core_pubkey) = self.resolve_validator( + selection.validator_index, + &active_validators, + "sync committee selection", + )?; + + let par_sig = + signeddata::SyncCommitteeSelection::new_partial(selection.clone(), self.share_idx); + + // Sync committee selection proofs sign over a + // `SyncAggregatorSelectionData` root under + // `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`. The selection wrapper's + // `message_root()` computes this — see `crates/eth2api/src/v1.rs`. + self.verify_selection_partial_sig( + &root_pubkey, + DomainName::SyncCommitteeSelectionProof, + selection.slot, + selection.message_root(), + &selection.selection_proof, + "sync committee selection", + ) + .await?; + + psigs_by_slot + .entry(selection.slot) + .or_default() + .insert(core_pubkey, par_sig); + } + + // See `beacon_committee_selections` — bound the whole fan-out + + // AggSigDB phase against a single deadline so the per-selection + // count does not multiply the wall-time budget. + let deadline = selections_deadline(); + + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(slot)); + for sub in &self.subs { + tokio::time::timeout_at(deadline, sub(&duty, set)) + .await + .map_err(|_: Elapsed| { + tracing::warn!(slot, "sync_committee_selections: subscriber timed out"); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "sync committee selection subscriber timed out", + ) + })? + .map_err(|err| { + tracing::error!( + slot, + error = %err, + "sync_committee_selections: subscriber failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "sync committee selection subscriber failed", + ) + .with_boxed_source(err) + })?; + } + } + + let await_fn = self.await_agg_sig_db_fn.as_ref().ok_or_else(|| { + tracing::error!( + "sync_committee_selections: await_agg_sig_db hook not registered — Component is half-wired" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "await_agg_sig_db hook not registered", + ) + })?; + + let mut resp: Vec = Vec::with_capacity(selections.len()); + for (&slot, set) in &psigs_by_slot { + let duty = Duty::new_prepare_sync_contribution_duty(SlotNumber::new(slot)); + for pk in set.inner().keys() { + let signed = tokio::time::timeout_at(deadline, await_fn(duty.clone(), *pk)) + .await + .map_err(|_: Elapsed| { + tracing::warn!(slot, "sync_committee_selections: aggsigdb await timed out"); + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "aggregated sync committee selection not available before deadline", + ) + })? + .map_err(|err| { + tracing::error!( + slot, + error = %err, + "sync_committee_selections: aggsigdb lookup failed" + ); + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "aggregated sync committee selection lookup failed", + ) + .with_boxed_source(err) + })?; + + let selection = downcast_sync_committee_selection(signed.as_ref())?; + resp.push(selection.0.clone()); + } + } + + Ok(EthResponse { + data: resp, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } #[instrument(skip_all)] @@ -1066,6 +1385,40 @@ fn swap_sync_committee_pubshares( Ok(()) } +/// Downcasts the aggregated signed data from the AggSigDB to a +/// `BeaconCommitteeSelection`. A mismatch indicates a wiring bug — the cluster +/// stored the wrong duty type under the `PrepareAggregator` duty — so it +/// surfaces as 500 rather than 4xx. +fn downcast_beacon_committee_selection( + signed: &dyn SignedData, +) -> Result<&signeddata::BeaconCommitteeSelection, ApiError> { + signed + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid beacon committee selection", + ) + }) +} + +/// Sync committee selections counterpart of +/// [`downcast_beacon_committee_selection`]. +fn downcast_sync_committee_selection( + signed: &dyn SignedData, +) -> Result<&signeddata::SyncCommitteeSelection, ApiError> { + signed + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid sync committee selection", + ) + }) +} + fn parse_bls_pubkey(s: &str) -> Result { let trimmed = s.strip_prefix("0x").unwrap_or(s); let bytes = hex::decode(trimmed).map_err(|err| { @@ -2168,6 +2521,720 @@ mod tests { .expect("insecure_test mode skips verification"); } + // CachedValidatorsProvider plumbing + // ==================================================================== + + /// `fetch_active_validators` returns whatever the registered + /// `CachedValidatorsProvider` yields, untouched. + #[tokio::test] + async fn fetch_active_validators_returns_cache_contents() { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + + let expected = HashMap::from([(1u64, dv_pubkey(0xA1)), (7u64, dv_pubkey(0xA7))]); + let component = Component::new_insecure( + eth2_cl, + dutydb, + 1, + TestValidatorCache::arc(expected.clone()), + ); + + let got = component + .fetch_active_validators() + .await + .expect("test cache always succeeds"); + assert_eq!(*got, expected); + } + + /// A provider that surfaces a transport-style error is mapped to a 502 + /// without leaking the underlying error into the client-visible + /// message. + #[tokio::test] + async fn fetch_active_validators_maps_provider_error_to_502() { + struct FailingCache; + + #[async_trait] + impl CachedValidatorsProvider for FailingCache { + async fn active_validators(&self) -> Result { + Err(ValidatorCacheError::EthBeaconNodeApiClientError( + pluto_eth2api::EthBeaconNodeApiClientError::UnexpectedResponse, + )) + } + + async fn complete_validators(&self) -> Result { + Err(ValidatorCacheError::EthBeaconNodeApiClientError( + pluto_eth2api::EthBeaconNodeApiClientError::UnexpectedResponse, + )) + } + } + + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-fail-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, dutydb, 1, Arc::new(FailingCache)); + + let err = component.fetch_active_validators().await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert_eq!(err.message, "active validators lookup failed"); + } + + // ==================================================================== + // beacon_committee_selections / sync_committee_selections handlers + // ==================================================================== + + use pluto_eth2api::v1::{ + BeaconCommitteeSelection as V1BeaconCommitteeSelection, + SyncCommitteeSelection as V1SyncCommitteeSelection, + }; + + use crate::signeddata::{ + BeaconCommitteeSelection as SignedBeaconCommitteeSelection, + SyncCommitteeSelection as SignedSyncCommitteeSelection, + }; + + /// Builds a `(Component, BeaconMock)` pair backed by `BeaconMock`'s + /// default spec — which already contains `DOMAIN_SELECTION_PROOF`, + /// `DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF`, and `SLOTS_PER_EPOCH`. The + /// component is *insecure* so the selections handlers can run without + /// real BLS signatures; specific tests that exercise verification opt + /// into a secure component via [`make_selections_component_secure`]. The + /// caller-supplied `active_validators` map populates the per-epoch + /// validator cache the handlers consult to translate + /// `validator_index → DV root pubkey`. + async fn make_selections_component_insecure( + active_validators: HashMap, + ) -> (Component, BeaconMock) { + let mock = BeaconMock::builder() + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap(); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = + DeadlinerTask::start(cancel.clone(), "selections-tests", FarFutureCalculator); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new_insecure( + eth2_cl, + dutydb, + 1, + TestValidatorCache::arc(active_validators), + ); + (component, mock) + } + + /// Like [`make_selections_component_insecure`] but with `insecure_test` + /// disabled. The caller supplies the `pub_share_by_pubkey` map so + /// `verify_partial_sig` can resolve a verify-share for each DV root, and + /// the `active_validators` map populates the validator-cache the + /// selections handlers consult. + async fn make_selections_component_secure( + map: HashMap, + active_validators: HashMap, + ) -> (Component, BeaconMock) { + let mock = BeaconMock::builder() + .genesis_time(DateTime::from_timestamp(0, 0).unwrap()) + .genesis_validators_root([0; 32]) + .build() + .await + .unwrap(); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "selections-secure-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new( + eth2_cl, + dutydb, + 1, + map, + false, + TestValidatorCache::arc(active_validators), + ); + (component, mock) + } + + /// Happy-path beacon committee selections: one selection in, one + /// aggregated selection out. + #[tokio::test] + async fn beacon_committee_selections_happy_path() { + const SLOT: Slot = 12; + const VAL_IDX: ValidatorIndex = 5; + let dv_root = dv_pubkey(0xA1); + + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(VAL_IDX, dv_root)])).await; + + // Returned aggregated selection — the byte pattern shows the + // response actually flowed through `await_agg_sig_db`. + let agg_selection = V1BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0xAB; 96], + }; + let agg_clone = agg_selection.clone(); + component.register_await_agg_sig_db(move |_duty, _pk| { + let agg = agg_clone.clone(); + async move { + Ok::, CallbackError>(Box::new( + SignedBeaconCommitteeSelection::new(agg), + )) + } + }); + + let input = V1BeaconCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + selection_proof: [0x77; 96], + }; + let resp = component + .beacon_committee_selections(vec![input]) + .await + .expect("happy path"); + + assert_eq!(resp.data.len(), 1); + assert_eq!(resp.data[0], agg_selection); + } + + /// Multi-selection input is fanned out via the subscriber once per slot + /// covered, and every aggregated reply is stitched into the response. + #[tokio::test] + async fn beacon_committee_selections_multi_selection_fanout_and_stitching() { + const SLOT_A: Slot = 10; + const SLOT_B: Slot = 11; + const VAL_IDX_A: ValidatorIndex = 1; + const VAL_IDX_B: ValidatorIndex = 2; + let dv_root_a = dv_pubkey(0xB1); + let dv_root_b = dv_pubkey(0xB2); + + let (mut component, _mock) = make_selections_component_insecure(HashMap::from([ + (VAL_IDX_A, dv_root_a), + (VAL_IDX_B, dv_root_b), + ])) + .await; + + // Track subscriber invocations: one per distinct slot in the input. + let observed_slots: Arc>> = Arc::new(Mutex::new(Vec::new())); + { + let observed_slots = Arc::clone(&observed_slots); + component.subscribe(move |duty, _set| { + let observed_slots = Arc::clone(&observed_slots); + async move { + observed_slots.lock().unwrap().push(duty.slot.inner()); + Ok(()) + } + }); + } + + // AggSigDB returns the slot+validator-index in the response so we + // can verify each `(slot, pk)` pair was awaited exactly once. + component.register_await_agg_sig_db(move |duty, pk| { + let slot = duty.slot.inner(); + let pk_bytes = pk.as_ref(); + // Recover the validator index from the pubkey: byte 0 is 0xB1 + // for VAL_IDX_A, 0xB2 for VAL_IDX_B. + let val_idx = match pk_bytes[0] { + 0xB1 => VAL_IDX_A, + 0xB2 => VAL_IDX_B, + _ => 999, + }; + async move { + Ok::, CallbackError>(Box::new( + SignedBeaconCommitteeSelection::new(V1BeaconCommitteeSelection { + slot, + validator_index: val_idx, + selection_proof: [0xCD; 96], + }), + )) + } + }); + + let input = vec![ + V1BeaconCommitteeSelection { + slot: SLOT_A, + validator_index: VAL_IDX_A, + selection_proof: [0x11; 96], + }, + V1BeaconCommitteeSelection { + slot: SLOT_B, + validator_index: VAL_IDX_B, + selection_proof: [0x22; 96], + }, + ]; + let resp = component + .beacon_committee_selections(input) + .await + .expect("multi-selection"); + + // Both slots were fanned out to the subscriber once each. + let mut slots = observed_slots.lock().unwrap().clone(); + slots.sort(); + assert_eq!(slots, vec![SLOT_A, SLOT_B]); + + // Both aggregated selections present in the response — iteration + // order over the HashMap is non-deterministic so we sort. + assert_eq!(resp.data.len(), 2); + let mut returned_slots: Vec = resp.data.iter().map(|s| s.slot).collect(); + returned_slots.sort(); + assert_eq!(returned_slots, vec![SLOT_A, SLOT_B]); + let mut returned_indices: Vec = resp.data.iter().map(|s| s.validator_index).collect(); + returned_indices.sort(); + assert_eq!(returned_indices, vec![VAL_IDX_A, VAL_IDX_B]); + } + + /// A selection whose validator index is not part of the cluster's + /// active set fails the lookup short-circuit with `400 Bad Request`. + #[tokio::test] + async fn beacon_committee_selections_rejects_unknown_validator_index() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + // `await_agg_sig_db` must NOT be reached for an unknown validator. + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called when validator index is unknown"); + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 999, + selection_proof: [0xEE; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// A tampered selection proof fails `verify_partial_sig` and the + /// handler short-circuits with `400 Bad Request` — the remaining + /// selections in the batch are not fanned out and the AggSigDB await is + /// never reached. + #[tokio::test] + async fn beacon_committee_selections_verification_failure_short_circuits() { + // Wire a secure component (insecure_test = false) and register a + // public-share map so `verify_partial_sig` runs the real BLS check + // against the zero signature. + let dv_root = dv_pubkey(0xC1); + let pub_share = [0x55_u8; 48]; + let map = HashMap::from([(dv_root, pub_share)]); + + let (mut component, _mock) = + make_selections_component_secure(map, HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called after verification failure"); + }); + + // Zero signature is rejected by `signing::verify` before BLS runs + // (returns SigningError::ZeroSignature). + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// A stalled `await_agg_sig_db` hook trips + /// [`SELECTIONS_PHASE_TIMEOUT`] and the handler returns 408. + #[tokio::test(start_paused = true)] + async fn beacon_committee_selections_await_agg_sig_db_times_out() { + let dv_root = dv_pubkey(0xD1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Closure that never completes. + component.register_await_agg_sig_db(|_duty, _pk| async { + std::future::pending::, CallbackError>>().await + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xDD; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Happy-path sync committee selections. + #[tokio::test] + async fn sync_committee_selections_happy_path() { + const SLOT: Slot = 13; + const VAL_IDX: ValidatorIndex = 7; + const SUBCOMM: u64 = 3; + let dv_root = dv_pubkey(0xE1); + + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(VAL_IDX, dv_root)])).await; + + let agg_selection = V1SyncCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + subcommittee_index: SUBCOMM, + selection_proof: [0xCC; 96], + }; + let agg_clone = agg_selection.clone(); + component.register_await_agg_sig_db(move |_duty, _pk| { + let agg = agg_clone.clone(); + async move { + Ok::, CallbackError>(Box::new( + SignedSyncCommitteeSelection::new(agg), + )) + } + }); + + let resp = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: SLOT, + validator_index: VAL_IDX, + subcommittee_index: SUBCOMM, + selection_proof: [0x99; 96], + }]) + .await + .expect("happy path"); + assert_eq!(resp.data.len(), 1); + assert_eq!(resp.data[0], agg_selection); + } + + /// Multi-selection sync committee selections produce one subscriber + /// invocation per distinct slot and stitch every aggregated reply into + /// the response. + #[tokio::test] + async fn sync_committee_selections_multi_selection_fanout_and_stitching() { + const SLOT_A: Slot = 20; + const SLOT_B: Slot = 21; + const VAL_IDX_A: ValidatorIndex = 1; + const VAL_IDX_B: ValidatorIndex = 2; + let dv_root_a = dv_pubkey(0xF1); + let dv_root_b = dv_pubkey(0xF2); + + let (mut component, _mock) = make_selections_component_insecure(HashMap::from([ + (VAL_IDX_A, dv_root_a), + (VAL_IDX_B, dv_root_b), + ])) + .await; + + let observed_slots: Arc>> = Arc::new(Mutex::new(Vec::new())); + { + let observed_slots = Arc::clone(&observed_slots); + component.subscribe(move |duty, _set| { + let observed_slots = Arc::clone(&observed_slots); + async move { + observed_slots.lock().unwrap().push(duty.slot.inner()); + Ok(()) + } + }); + } + + component.register_await_agg_sig_db(move |duty, pk| { + let slot = duty.slot.inner(); + let pk_bytes = pk.as_ref(); + let val_idx = match pk_bytes[0] { + 0xF1 => VAL_IDX_A, + 0xF2 => VAL_IDX_B, + _ => 999, + }; + async move { + Ok::, CallbackError>(Box::new( + SignedSyncCommitteeSelection::new(V1SyncCommitteeSelection { + slot, + validator_index: val_idx, + subcommittee_index: 0, + selection_proof: [0xDE; 96], + }), + )) + } + }); + + let resp = component + .sync_committee_selections(vec![ + V1SyncCommitteeSelection { + slot: SLOT_A, + validator_index: VAL_IDX_A, + subcommittee_index: 0, + selection_proof: [0x11; 96], + }, + V1SyncCommitteeSelection { + slot: SLOT_B, + validator_index: VAL_IDX_B, + subcommittee_index: 1, + selection_proof: [0x22; 96], + }, + ]) + .await + .expect("multi-selection"); + + let mut slots = observed_slots.lock().unwrap().clone(); + slots.sort(); + assert_eq!(slots, vec![SLOT_A, SLOT_B]); + + assert_eq!(resp.data.len(), 2); + let mut returned_slots: Vec = resp.data.iter().map(|s| s.slot).collect(); + returned_slots.sort(); + assert_eq!(returned_slots, vec![SLOT_A, SLOT_B]); + } + + /// Sync committee selection with an unknown validator index returns + /// 400 without touching the AggSigDB. + #[tokio::test] + async fn sync_committee_selections_rejects_unknown_validator_index() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called when validator index is unknown"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 999, + subcommittee_index: 0, + selection_proof: [0xEE; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// Sync committee selection verification failure short-circuits. + #[tokio::test] + async fn sync_committee_selections_verification_failure_short_circuits() { + let dv_root = dv_pubkey(0xC2); + let pub_share = [0x66_u8; 48]; + let map = HashMap::from([(dv_root, pub_share)]); + + let (mut component, _mock) = + make_selections_component_secure(map, HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be called after verification failure"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// Sync committee selection times out on a stalled AggSigDB. + #[tokio::test(start_paused = true)] + async fn sync_committee_selections_await_agg_sig_db_times_out() { + let dv_root = dv_pubkey(0xD2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + std::future::pending::, CallbackError>>().await + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xDD; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// A stalled subscriber must not pin the handler indefinitely — the + /// per-request deadline applies to subscriber fan-out as well as the + /// AggSigDB await. The handler returns 408. + #[tokio::test(start_paused = true)] + async fn beacon_committee_selections_subscriber_timeout() { + let dv_root = dv_pubkey(0xF1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Subscriber that never completes. + component.subscribe(|_duty, _set| async { + std::future::pending::>().await + }); + // `await_agg_sig_db` must NOT be reached if the subscriber blocks. + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached when the subscriber stalls"); + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xAA; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// Counterpart of [`beacon_committee_selections_subscriber_timeout`]. + #[tokio::test(start_paused = true)] + async fn sync_committee_selections_subscriber_timeout() { + let dv_root = dv_pubkey(0xF2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.subscribe(|_duty, _set| async { + std::future::pending::>().await + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached when the subscriber stalls"); + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xBB; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// An empty selections array is a well-defined no-op: the handler runs + /// the active-validators lookup, finds nothing to fan out, never queries + /// the AggSigDB, and returns an empty `data` array. + #[tokio::test] + async fn beacon_committee_selections_empty_input_returns_empty_data() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + // Subscriber and AggSigDB must NOT be touched for an empty input. + component.subscribe(|_duty, _set| async { + panic!("subscriber must not run for empty input"); + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached for empty input"); + }); + + let resp = component + .beacon_committee_selections(vec![]) + .await + .expect("empty input is a no-op success"); + assert!(resp.data.is_empty()); + } + + /// Counterpart of + /// [`beacon_committee_selections_empty_input_returns_empty_data`]. + #[tokio::test] + async fn sync_committee_selections_empty_input_returns_empty_data() { + let (mut component, _mock) = make_selections_component_insecure(HashMap::new()).await; + + component.subscribe(|_duty, _set| async { + panic!("subscriber must not run for empty input"); + }); + component.register_await_agg_sig_db(|_duty, _pk| async { + panic!("await_agg_sig_db must not be reached for empty input"); + }); + + let resp = component + .sync_committee_selections(vec![]) + .await + .expect("empty input is a no-op success"); + assert!(resp.data.is_empty()); + } + + /// If the AggSigDB ever returns the wrong concrete `SignedData` type + /// under a `PrepareAggregator` duty (a wiring bug), the handler must + /// surface `500 Internal Server Error` rather than panic or return a + /// silently-wrong response. + #[tokio::test] + async fn beacon_committee_selections_rejects_aggsigdb_type_mismatch() { + let dv_root = dv_pubkey(0xA1); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + // Wrong type — returns a `SyncCommitteeSelection` under a + // `PrepareAggregator` duty, which `downcast_beacon_committee_selection` + // cannot satisfy. + component.register_await_agg_sig_db(|_duty, _pk| async { + Ok::, CallbackError>(Box::new(SignedSyncCommitteeSelection::new( + V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0xCC; 96], + }, + ))) + }); + + let err = component + .beacon_committee_selections(vec![V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0x00; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// Counterpart of + /// [`beacon_committee_selections_rejects_aggsigdb_type_mismatch`]. + #[tokio::test] + async fn sync_committee_selections_rejects_aggsigdb_type_mismatch() { + let dv_root = dv_pubkey(0xA2); + let (mut component, _mock) = + make_selections_component_insecure(HashMap::from([(1u64, dv_root)])).await; + + component.register_await_agg_sig_db(|_duty, _pk| async { + Ok::, CallbackError>(Box::new(SignedBeaconCommitteeSelection::new( + V1BeaconCommitteeSelection { + slot: 1, + validator_index: 1, + selection_proof: [0xDD; 96], + }, + ))) + }); + + let err = component + .sync_committee_selections(vec![V1SyncCommitteeSelection { + slot: 1, + validator_index: 1, + subcommittee_index: 0, + selection_proof: [0x00; 96], + }]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + // ==================================================================== // proposal / submit_proposal / submit_blinded_proposal // ==================================================================== diff --git a/crates/core/src/validatorapi/router.rs b/crates/core/src/validatorapi/router.rs index a32ab9b9..a7a72528 100644 --- a/crates/core/src/validatorapi/router.rs +++ b/crates/core/src/validatorapi/router.rs @@ -18,22 +18,32 @@ use axum::{ }; use serde::Deserialize; -/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request -/// bodies. A realistic cluster ships at most a few thousand validator indices; -/// 64 KiB still allows ~10k indices in either numeric or string encoding, -/// well above any plausible workload. -const DUTIES_BODY_LIMIT: usize = 64 * 1024; - use super::{ error::ApiError, handler::Handler, types::{ AttestationDataOpts, AttestationDataResponse, AttesterDutiesOpts, AttesterDutiesResponse, - CommitteeIndex, NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, - SyncCommitteeDutiesOpts, SyncCommitteeDutiesResponse, ValIndexes, + BeaconCommitteeSelection, BeaconCommitteeSelectionsResponse, CommitteeIndex, + NodeVersionResponse, ProposerDutiesOpts, ProposerDutiesResponse, SyncCommitteeDutiesOpts, + SyncCommitteeDutiesResponse, SyncCommitteeSelection, SyncCommitteeSelectionsResponse, + ValIndexes, }, }; +/// Cap on the `POST /eth/v1/validator/duties/{attester,sync}/{epoch}` request +/// bodies. A realistic cluster ships at most a few thousand validator indices; +/// 64 KiB still allows ~10k indices in either numeric or string encoding, +/// well above any plausible workload. +const DUTIES_BODY_LIMIT: usize = 64 * 1024; + +/// Cap on the `POST /eth/v1/validator/{beacon,sync}_committee_selections` +/// request bodies. Each selection is ~210-250 bytes of JSON (slot, validator +/// index, optional subcommittee index, 96-byte BLS proof in `0x` hex), so +/// 64 KiB admits ~250-300 entries — far more than a realistic cluster while +/// bounding the per-request CPU cost of the BLS verifications and AggSigDB +/// awaits the handler performs. +const SELECTIONS_BODY_LIMIT: usize = 64 * 1024; + /// Query parameters for `GET /eth/v1/validator/attestation_data`. #[derive(Debug, Clone, Deserialize)] struct AttestationDataQuery { @@ -66,7 +76,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { Router::new() .route( "/eth/v1/validator/duties/attester/{epoch}", - duties_post(attester_duties), + bounded_post(attester_duties, DUTIES_BODY_LIMIT), ) .route( "/eth/v1/validator/duties/proposer/{epoch}", @@ -74,7 +84,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/duties/sync/{epoch}", - duties_post(sync_committee_duties), + bounded_post(sync_committee_duties, DUTIES_BODY_LIMIT), ) .route("/eth/v1/validator/attestation_data", get(attestation_data)) .route("/eth/v1/beacon/pool/attestations", post(respond_404)) @@ -106,7 +116,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { .route("/proposer_config", get(respond_404)) .route( "/eth/v1/validator/beacon_committee_selections", - post(beacon_committee_selections), + bounded_post(beacon_committee_selections, SELECTIONS_BODY_LIMIT), ) .route("/eth/v1/validator/aggregate_attestation", get(respond_404)) .route( @@ -136,7 +146,7 @@ pub fn new_router(handler: Arc, builder_enabled: bool) -> Router { ) .route( "/eth/v1/validator/sync_committee_selections", - post(sync_committee_selections), + bounded_post(sync_committee_selections, SELECTIONS_BODY_LIMIT), ) .route("/eth/v1/node/version", get(node_version)) .fallback(proxy_handler) @@ -205,29 +215,28 @@ async fn attestation_data( Ok(Json(response)) } -/// Wraps a `POST /eth/v1/validator/duties/*` handler with a body-size cap -/// and the Charon-parity content-type policy. The cap is local to these -/// two routes so unrelated POST handlers (e.g. `submit_attestations`) keep -/// axum's default 2 MiB. -fn duties_post(handler: H) -> MethodRouter +/// Wraps a `POST` handler with a body-size cap and the JSON content-type +/// policy. The cap is local to the route so unrelated POST handlers (e.g. +/// `submit_attestations`) keep axum's default 2 MiB. +fn bounded_post(handler: H, body_limit: usize) -> MethodRouter where H: axum::handler::Handler, T: 'static, S: Clone + Send + Sync + 'static, { post(handler) - .route_layer(DefaultBodyLimit::max(DUTIES_BODY_LIMIT)) + .route_layer(DefaultBodyLimit::max(body_limit)) .route_layer(middleware::from_fn(enforce_json_content_type)) } -/// Matches Charon's content-type handling at `core/validatorapi/router.go:365`: -/// a missing `Content-Type` is treated as `application/json`; an unrecognized -/// content type is rejected with `415 Unsupported Media Type`. SSZ is not -/// supported yet — when it lands, this is the right seam to extend. +/// Content-type handling: a missing `Content-Type` is treated as +/// `application/json`; an unrecognized content type is rejected with `415 +/// Unsupported Media Type`. SSZ is not supported yet — when it lands, this is +/// the right seam to extend. /// /// Without this layer, axum's `Json` extractor would reject a missing header -/// with `MissingJsonContentType`, which our envelope normalises to `400` — -/// diverging from Charon, which lets VCs that don't set the header through. +/// with `MissingJsonContentType`, which our envelope normalises to `400`. We +/// instead let VCs that don't set the header through. async fn enforce_json_content_type(mut req: Request, next: Next) -> Result { match req.headers().get(header::CONTENT_TYPE) { None => { @@ -262,8 +271,8 @@ fn query_rejection_to_api_error(rejection: QueryRejection) -> ApiError { /// instead of axum's default plain-text response. /// /// Genuine parse failures — malformed JSON (`400`) and wrong element type -/// (`422`) — are normalised to a uniform `400`, matching Charon's `unmarshal`, -/// which returns `400` for all body unmarshal failures. Content-Type rejections +/// (`422`) — are normalised to a uniform `400` for all body unmarshal +/// failures. Content-Type rejections /// no longer reach this function: [`enforce_json_content_type`] intercepts /// them upstream so missing/JSON requests pass through and non-JSON requests /// return `415`. The body-size-limit rejection from [`DefaultBodyLimit`] @@ -310,8 +319,19 @@ async fn submit_exit() { todo!("vapi: submit_exit"); } -async fn beacon_committee_selections() { - todo!("vapi: beacon_committee_selections"); +async fn beacon_committee_selections( + State(state): State>, + selections: Result>, JsonRejection>, +) -> Result, ApiError> { + let Json(selections) = selections.map_err(json_rejection_to_api_error)?; + let response = state + .handler + .beacon_committee_selections(selections) + .await?; + + Ok(Json(BeaconCommitteeSelectionsResponse { + data: response.data, + })) } async fn aggregate_attestation() { @@ -338,8 +358,16 @@ async fn submit_proposal_preparations() { todo!("vapi: submit_proposal_preparations"); } -async fn sync_committee_selections() { - todo!("vapi: sync_committee_selections"); +async fn sync_committee_selections( + State(state): State>, + selections: Result>, JsonRejection>, +) -> Result, ApiError> { + let Json(selections) = selections.map_err(json_rejection_to_api_error)?; + let response = state.handler.sync_committee_selections(selections).await?; + + Ok(Json(SyncCommitteeSelectionsResponse { + data: response.data, + })) } async fn node_version( @@ -366,8 +394,9 @@ mod tests { use crate::validatorapi::{ testutils::TestHandler, types::{ - AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, ProposerDutiesResponse, - ProposerDuty, SyncCommitteeDutiesResponse, SyncCommitteeDuty, ValIndexes, + AttestationDataResponse, AttesterDutiesResponse, AttesterDuty, + BeaconCommitteeSelection, EthResponse, ProposerDutiesResponse, ProposerDuty, + SyncCommitteeDutiesResponse, SyncCommitteeDuty, SyncCommitteeSelection, ValIndexes, }, }; @@ -604,8 +633,7 @@ mod tests { /// A malformed duties body emits the same `{ code, message }` envelope and /// uniform 400 as the rest of the router, rather than axum's default /// plain-text rejection (which would be 400 for a syntax error but 422 for - /// a type error). Mirrors Charon's `unmarshal`, which returns 400 for every - /// body parse failure. + /// a type error). #[tokio::test] async fn attester_duties_returns_api_error_shape_on_bad_body() { use axum::{ @@ -632,11 +660,10 @@ mod tests { assert!(json["message"].is_string()); } - /// Charon-parity: a duties request that omits `Content-Type` is - /// treated as `application/json` rather than rejected — the + /// A duties request that omits `Content-Type` is treated as + /// `application/json` rather than rejected — the /// `enforce_json_content_type` middleware injects the header before - /// the `Json` extractor sees the request. See `core/validatorapi/ - /// router.go:365` (`if contentHeader == "" || ...`). + /// the `Json` extractor sees the request. #[tokio::test] async fn attester_duties_accepts_missing_content_type() { use axum::{ @@ -662,9 +689,9 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); } - /// Charon-parity: a duties request with a non-JSON `Content-Type` - /// returns `415 Unsupported Media Type`, not the `400` that the - /// generic body-parse normaliser would produce. + /// A duties request with a non-JSON `Content-Type` returns `415 + /// Unsupported Media Type`, not the `400` that the generic body-parse + /// normaliser would produce. #[tokio::test] async fn attester_duties_rejects_non_json_content_type() { use axum::{ @@ -730,4 +757,208 @@ mod tests { let bad = serde_json::from_str::("[-1]"); assert!(bad.is_err()); } + + /// Verifies the router wraps the `Handler::beacon_committee_selections` + /// payload into the `{ "data": [...] }` wire shape, dropping the + /// `execution_optimistic` / `finalized` / `dependent_root` metadata that + /// the trait method carries internally. + #[tokio::test] + async fn beacon_committee_selections_wraps_handler_value() { + let selection = BeaconCommitteeSelection { + slot: 10, + validator_index: 5, + selection_proof: [0xAA; 96], + }; + let handler = TestHandler::default().with_beacon_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = beacon_committee_selections(State(state), Ok(Json(vec![]))) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert!(json.get("execution_optimistic").is_none()); + assert!(json.get("finalized").is_none()); + assert!(json.get("dependent_root").is_none()); + assert_eq!(json["data"][0]["slot"], "10"); + assert_eq!(json["data"][0]["validator_index"], "5"); + } + + /// Counterpart of [`beacon_committee_selections_wraps_handler_value`] for + /// the sync-committee variant. + #[tokio::test] + async fn sync_committee_selections_wraps_handler_value() { + let selection = SyncCommitteeSelection { + slot: 20, + validator_index: 7, + subcommittee_index: 2, + selection_proof: [0xBB; 96], + }; + let handler = TestHandler::default().with_sync_committee_selections(EthResponse { + data: vec![selection], + execution_optimistic: false, + finalized: false, + dependent_root: None, + }); + let state = Arc::new(AppState { + handler: Arc::new(handler), + builder_enabled: false, + }); + + let Json(body) = sync_committee_selections(State(state), Ok(Json(vec![]))) + .await + .unwrap(); + + let json = serde_json::to_value(&body).unwrap(); + assert!(json.get("execution_optimistic").is_none()); + assert_eq!(json["data"][0]["slot"], "20"); + assert_eq!(json["data"][0]["validator_index"], "7"); + assert_eq!(json["data"][0]["subcommittee_index"], "2"); + } + + /// Verifies the body-limit layer on the selection POST routes rejects + /// oversized bodies before any BLS verification work happens. + #[tokio::test] + async fn beacon_committee_selections_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let big = vec![b'0'; SELECTIONS_BODY_LIMIT * 2]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/beacon_committee_selections") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// Counterpart of [`beacon_committee_selections_rejects_oversized_body`]. + #[tokio::test] + async fn sync_committee_selections_rejects_oversized_body() { + use axum::{ + body::Body, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let big = vec![b'0'; SELECTIONS_BODY_LIMIT * 2]; + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/sync_committee_selections") + .header("content-type", "application/json") + .header("content-length", big.len()) + .body(Body::from(big)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::PAYLOAD_TOO_LARGE); + } + + /// Malformed JSON on the selection POST routes is normalised into the + /// router's standard `{ code, message }` envelope rather than axum's + /// default plain-text 400 / 422 / 415 — every body unmarshal failure + /// surfaces as a uniform `400`. The + /// same plumbing covers the duties endpoints; see + /// [`attester_duties_returns_api_error_shape_on_malformed_body`] for the + /// duties variant. + #[tokio::test] + async fn beacon_committee_selections_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/beacon_committee_selections") + .header("content-type", "application/json") + .body(Body::from(r#"{ "not": "an array" }"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } + + /// Counterpart of + /// [`beacon_committee_selections_returns_api_error_shape_on_malformed_body`]. + #[tokio::test] + async fn sync_committee_selections_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/sync_committee_selections") + .header("content-type", "application/json") + .body(Body::from("not-json-at-all")) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } + + /// Duties POST endpoints share the same `json_rejection_to_api_error` + /// plumbing as the selection routes — this test locks the envelope + /// contract on the duties side so a future refactor that re-introduces + /// bare `Json` extraction is caught by a failing test rather + /// than only by manual review. + #[tokio::test] + async fn attester_duties_returns_api_error_shape_on_malformed_body() { + use axum::{ + body::{Body, to_bytes}, + http::{Method, Request}, + }; + use tower::ServiceExt; + + let handler = TestHandler::default(); + let app = new_router(Arc::new(handler), false); + + let req = Request::builder() + .method(Method::POST) + .uri("/eth/v1/validator/duties/attester/42") + .header("content-type", "application/json") + .body(Body::from(r#"{ "not": "an array" }"#)) + .unwrap(); + let resp = app.oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::BAD_REQUEST); + let body = to_bytes(resp.into_body(), 64 * 1024).await.unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["code"], 400); + assert!(json["message"].is_string()); + } } diff --git a/crates/core/src/validatorapi/testutils.rs b/crates/core/src/validatorapi/testutils.rs index 45980fe7..16b78e2c 100644 --- a/crates/core/src/validatorapi/testutils.rs +++ b/crates/core/src/validatorapi/testutils.rs @@ -34,6 +34,10 @@ pub struct TestHandler { pub sync_committee_duties_response: Option, /// Value returned by [`Handler::attestation_data`]. pub attestation_data_response: Option, + /// Value returned by [`Handler::beacon_committee_selections`]. + pub beacon_committee_selections_response: Option>>, + /// Value returned by [`Handler::sync_committee_selections`]. + pub sync_committee_selections_response: Option>>, } impl TestHandler { @@ -68,6 +72,24 @@ impl TestHandler { self.attestation_data_response = Some(response); self } + + /// Sets the response returned by [`Handler::beacon_committee_selections`]. + pub fn with_beacon_committee_selections( + mut self, + response: EthResponse>, + ) -> Self { + self.beacon_committee_selections_response = Some(response); + self + } + + /// Sets the response returned by [`Handler::sync_committee_selections`]. + pub fn with_sync_committee_selections( + mut self, + response: EthResponse>, + ) -> Self { + self.sync_committee_selections_response = Some(response); + self + } } #[async_trait] @@ -84,40 +106,40 @@ impl Handler for TestHandler { &self, _opts: AttesterDutiesOpts, ) -> Result { - Ok(self - .attester_duties_response - .clone() - .expect("attester_duties not stubbed in TestHandler")) + match self.attester_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("attester_duties not stubbed in TestHandler"), + } } async fn proposer_duties( &self, _opts: ProposerDutiesOpts, ) -> Result { - Ok(self - .proposer_duties_response - .clone() - .expect("proposer_duties not stubbed in TestHandler")) + match self.proposer_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("proposer_duties not stubbed in TestHandler"), + } } async fn sync_committee_duties( &self, _opts: SyncCommitteeDutiesOpts, ) -> Result { - Ok(self - .sync_committee_duties_response - .clone() - .expect("sync_committee_duties not stubbed in TestHandler")) + match self.sync_committee_duties_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("sync_committee_duties not stubbed in TestHandler"), + } } async fn attestation_data( &self, _opts: AttestationDataOpts, ) -> Result { - Ok(self - .attestation_data_response - .clone() - .expect("attestation_data not stubbed in TestHandler")) + match self.attestation_data_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("attestation_data not stubbed in TestHandler"), + } } async fn submit_attestations( @@ -163,14 +185,20 @@ impl Handler for TestHandler { &self, _selections: Vec, ) -> Result>, ApiError> { - unimplemented!("beacon_committee_selections not stubbed in TestHandler") + match self.beacon_committee_selections_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("beacon_committee_selections not stubbed in TestHandler"), + } } async fn sync_committee_selections( &self, _selections: Vec, ) -> Result>, ApiError> { - unimplemented!("sync_committee_selections not stubbed in TestHandler") + match self.sync_committee_selections_response.as_ref() { + Some(r) => Ok(r.clone()), + None => unimplemented!("sync_committee_selections not stubbed in TestHandler"), + } } async fn validators( diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index a3a9a68a..d35c844d 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -22,8 +22,21 @@ pub use pluto_eth2api::{ GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, + v1::{ + BeaconCommitteeSelection as V1BeaconCommitteeSelection, + SyncCommitteeSelection as V1SyncCommitteeSelection, + }, }; +/// Beacon-committee selection payload. Aliases the consensus-spec +/// `v1::BeaconCommitteeSelection` so the handler can operate on the +/// validator-index / slot / selection-proof tuple directly. +pub type BeaconCommitteeSelection = V1BeaconCommitteeSelection; + +/// Sync-committee selection payload. Aliases the consensus-spec +/// `v1::SyncCommitteeSelection`. +pub type SyncCommitteeSelection = V1SyncCommitteeSelection; + /// Attestation data alias for the consensus-spec phase0 type. pub type AttestationData = phase0::AttestationData; @@ -139,6 +152,22 @@ pub struct AttestationDataResponse { pub data: AttestationData, } +/// Response envelope for the `beacon_committee_selections` endpoint — a `data` +/// array of aggregated selection proofs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BeaconCommitteeSelectionsResponse { + /// Aggregated beacon-committee selection proofs. + pub data: Vec, +} + +/// Response envelope for the `sync_committee_selections` endpoint — a `data` +/// array of aggregated selection proofs. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncCommitteeSelectionsResponse { + /// Aggregated sync-committee selection proofs. + pub data: Vec, +} + /// Validator payload. Placeholder. #[derive(Debug, Clone)] pub struct Validator {} @@ -150,8 +179,7 @@ pub use crate::signeddata::VersionedProposal; pub use crate::signeddata::VersionedSignedProposal; /// Versioned signed blinded proposal payload — alias of the eth2api versioned -/// wrapper, the same shape consumed by Go's -/// `SubmitBlindedProposalOpts.Proposal`. +/// wrapper. pub use pluto_eth2api::versioned::VersionedSignedBlindedProposal; /// Versioned attestation payload. Placeholder. @@ -182,14 +210,6 @@ pub struct SyncCommitteeContribution {} #[derive(Debug, Clone)] pub struct SignedContributionAndProof {} -/// Beacon-committee selection payload. Placeholder. -#[derive(Debug, Clone)] -pub struct BeaconCommitteeSelection {} - -/// Sync-committee selection payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeSelection {} - /// Validator-index request body for the `attester_duties` and /// `sync_committee_duties` endpoints. ///