From 6c66c352b39c68913538637fe5f70ce6bf7e4c2e Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:08:37 +0200 Subject: [PATCH 1/4] feat(core): add CachedValidatorsProvider trait for validatorapi Defines the `CachedValidatorsProvider` async trait in pluto-core's validatorapi module. Mirrors Charon's `app/eth2wrap.CachedValidatorsProvider` so the validator-API component can look up `validator_index -> DV root pubkey` (used by the selections / voluntary-exit / sync-committee submit handlers in follow-up PRs) without `pluto-core` depending on the application crate that owns the concrete per-epoch cache implementation. `Component` gains an `Arc` field threaded through both constructors as a required dependency, plus a private `fetch_active_validators` helper bounded by `UPSTREAM_REQUEST_TIMEOUT` that all submit handlers share. A `TestValidatorCache` test double lets unit tests supply the map directly without spinning up a real beacon node. The existing `crates/app/src/eth2wrap/valcache.rs` is intentionally untouched. It will impl this trait at the wiring layer (`pluto-app` already depends on `pluto-core`, so the dependency runs the right way without a new crate or any cross-crate moves). Test plan: - cargo +nightly fmt --all --check - cargo clippy -p pluto-core --all-targets --all-features -- -D warnings - cargo test -p pluto-core --all-features (430 passing; 2 new tests on `fetch_active_validators` covering the happy path and the provider-error -> 502 mapping) Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- crates/core/src/validatorapi/component.rs | 150 +++++++++++++++++- crates/core/src/validatorapi/mod.rs | 2 + .../core/src/validatorapi/validator_cache.rs | 35 ++++ 3 files changed, 181 insertions(+), 6 deletions(-) create mode 100644 crates/core/src/validatorapi/validator_cache.rs diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 969f92e7..29aa3af7 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -13,7 +13,7 @@ use pluto_eth2api::{ EthBeaconNodeApiClient, GetAttesterDutiesRequest, GetAttesterDutiesResponse, GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, - spec::phase0::{BLSPubKey, Epoch, Root}, + spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; use tokio::time::error::Elapsed; @@ -32,6 +32,7 @@ use super::{ VersionedAttestation, VersionedProposal, VersionedSignedAggregateAndProof, VersionedSignedBlindedProposal, VersionedSignedProposal, }, + validator_cache::CachedValidatorsProvider, }; use crate::{ dutydb::{Error as DutyDbError, MemDB}, @@ -175,6 +176,14 @@ pub struct Component { /// Looks up the root pubkey for an `(slot, commIdx, valIdx)` triple. #[allow(dead_code, reason = "consumed by submit_attestations in later PRs")] pub_key_by_att_fn: Option, + /// Cluster's per-epoch active-validators lookup. Consumed by the + /// selections / voluntary-exit / sync-committee submit handlers to + /// translate validator-client-supplied `validator_index` values into + /// DV root public keys. Mirrors Go's `c.eth2Cl.ActiveValidators(ctx)`, + /// which is itself backed by `app/eth2wrap`'s per-epoch validator + /// cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + validator_cache: Arc, } impl Component { @@ -185,6 +194,7 @@ impl Component { share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -200,6 +210,7 @@ impl Component { await_agg_sig_db_fn: None, duty_def_fn: None, pub_key_by_att_fn: None, + validator_cache, } } @@ -212,6 +223,7 @@ impl Component { eth2_cl: Arc, dutydb: Arc, share_idx: u64, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -227,6 +239,7 @@ impl Component { await_agg_sig_db_fn: None, duty_def_fn: None, pub_key_by_att_fn: None, + validator_cache, } } @@ -352,6 +365,28 @@ impl Component { Ok(()) } + + /// Fetches the cluster's active validators through the per-epoch + /// [`CachedValidatorsProvider`], bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. + /// Translates cache failures into `ApiError`s without leaking the + /// underlying error into the client-visible message. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via + /// `app/eth2wrap`'s validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + async fn fetch_active_validators( + &self, + ) -> Result, ApiError> { + tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.validator_cache.active_validators(), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("active validators"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed") + .with_boxed_source(err) + }) + } } /// Errors returned by [`Component::verify_partial_sig`]. @@ -821,6 +856,36 @@ mod tests { validatorapi::types::AttestationDataOpts, }; + /// In-memory stand-in for the per-epoch validator cache. Tests supply + /// the `validator_index -> root pubkey` map up front instead of + /// running a real beacon-node mock through a `ValidatorCache`. + #[derive(Default)] + pub(super) struct TestValidatorCache(HashMap); + + impl TestValidatorCache { + pub(super) fn arc( + map: HashMap, + ) -> Arc { + Arc::new(Self(map)) + } + + pub(super) fn empty() -> Arc { + Self::arc(HashMap::new()) + } + } + + #[async_trait] + impl CachedValidatorsProvider for TestValidatorCache { + async fn active_validators( + &self, + ) -> Result< + HashMap, + super::super::validator_cache::CachedValidatorsError, + > { + Ok(self.0.clone()) + } + } + /// Schedules every duty with a deadline at `MAX_UTC`, so duties are /// `Scheduled` but never naturally expire. struct FarFutureCalculator; @@ -844,7 +909,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, dutydb) } @@ -1086,7 +1152,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); // Start an await before any data is stored. let waiter = { @@ -1272,7 +1339,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - Component::new(eth2_cl, dutydb, 1, map, false) + Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()) } /// `Subscribe` invokes every registered subscriber, each receiving its @@ -1503,7 +1570,7 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new(eth2_cl, dutydb, 1, map, false); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); (component, mock) } @@ -1586,7 +1653,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, dutydb, 1); + let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty()); component .verify_partial_sig( @@ -1599,4 +1666,75 @@ mod tests { .await .expect("insecure_test mode skips verification"); } + + // ==================================================================== + // CachedValidatorsProvider plumbing + // ==================================================================== + + /// `fetch_active_validators` returns whatever the registered + /// `CachedValidatorsProvider` yields, untouched. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)` return shape. + #[tokio::test] + async fn fetch_active_validators_returns_cache_contents() { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + + let expected = HashMap::from([(1u64, dv_pubkey(0xA1)), (7u64, dv_pubkey(0xA7))]); + let component = Component::new_insecure( + eth2_cl, + dutydb, + 1, + TestValidatorCache::arc(expected.clone()), + ); + + let got = component + .fetch_active_validators() + .await + .expect("test cache always succeeds"); + assert_eq!(got, expected); + } + + /// A provider that surfaces a transport-style error is mapped to a 502 + /// without leaking the underlying error into the client-visible + /// message. + #[tokio::test] + async fn fetch_active_validators_maps_provider_error_to_502() { + struct FailingCache; + + #[async_trait] + impl CachedValidatorsProvider for FailingCache { + async fn active_validators( + &self, + ) -> Result< + HashMap, + super::super::validator_cache::CachedValidatorsError, + > { + Err("upstream unavailable".into()) + } + } + + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-validator-cache-fail-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, dutydb, 1, Arc::new(FailingCache)); + + let err = component.fetch_active_validators().await.unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + assert_eq!(err.message, "active validators lookup failed"); + } } diff --git a/crates/core/src/validatorapi/mod.rs b/crates/core/src/validatorapi/mod.rs index 8442859c..28a1d680 100644 --- a/crates/core/src/validatorapi/mod.rs +++ b/crates/core/src/validatorapi/mod.rs @@ -9,6 +9,7 @@ pub mod handler; pub mod metrics; pub mod router; pub mod types; +pub mod validator_cache; #[cfg(test)] pub mod testutils; @@ -17,3 +18,4 @@ pub use component::Component; pub use error::ApiError; pub use handler::Handler; pub use router::new_router; +pub use validator_cache::{CachedValidatorsError, CachedValidatorsProvider}; diff --git a/crates/core/src/validatorapi/validator_cache.rs b/crates/core/src/validatorapi/validator_cache.rs new file mode 100644 index 00000000..34f22b5f --- /dev/null +++ b/crates/core/src/validatorapi/validator_cache.rs @@ -0,0 +1,35 @@ +//! Cluster-wide active-validators lookup consumed by submit handlers. +//! +//! Mirrors Charon's `app/eth2wrap.CachedValidatorsProvider` interface: +//! submit handlers that have to translate a validator-client-supplied +//! `validator_index` into the cluster's DV root public key consult this +//! trait. Defined here in `pluto-core` so the validator API does not need +//! to depend on the application crate that owns the concrete per-epoch +//! cache implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use pluto_eth2api::spec::phase0::{BLSPubKey, ValidatorIndex}; + +/// Boxed error returned by [`CachedValidatorsProvider`] methods. Kept +/// opaque so the trait does not bind callers to any single backing +/// implementation's error type. +pub type CachedValidatorsError = Box; + +/// Provides the cluster's currently active validators, indexed by +/// validator index. Mirrors Go's `eth2Cl.ActiveValidators(ctx)`, which is +/// itself backed by `app/eth2wrap`'s per-epoch validator cache; the +/// validator-API [`Component`](super::Component) calls through this trait +/// so the cache is the single source of truth across duty handlers +/// without `pluto-core` depending on the cache crate. +/// +/// Implementations may populate the underlying cache on demand — callers +/// must not assume the call is non-blocking. +#[async_trait] +pub trait CachedValidatorsProvider: Send + Sync { + /// Returns the `validator_index -> DV root BLS public key` map. + async fn active_validators( + &self, + ) -> Result, CachedValidatorsError>; +} From a8076b8ce6d5b92018c28910537f4f5e8c89cb9e Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Mon, 8 Jun 2026 14:47:16 +0200 Subject: [PATCH 2/4] feat(core): implement validatorapi sync committee contribution + submit handlers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the sync-committee read + two submit handlers from Charon: - `sync_committee_contribution` (read): delegates to the `await_sync_contribution` hook registered by PR #458, bounded by `DUTY_AWAIT_TIMEOUT`. Mirrors `core/validatorapi/validatorapi.go:948-955`. - `submit_sync_committee_messages`: looks up active validators through `Component::fetch_active_validators` (foundation PR #476), builds a partial `SignedSyncMessage` per VC message, verifies the outer partial signature against this node's share, groups by slot, and fans out to subscribers. Mirrors `validatorapi.go:958-1003`. - `submit_sync_committee_contributions`: same shape plus the inner selection-proof verification against the root validator pubkey using `DomainName::SyncCommitteeSelectionProof`. Mirrors `validatorapi.go:1009-1069`. Replaces the `SyncCommitteeMessage`, `SyncCommitteeContribution`, and `SignedContributionAndProof` placeholder structs in `validatorapi/types.rs` with the real altair spec types from `pluto_eth2api`. This branch rebases on top of `bohdan/validatorapi-cache-trait` (#476), which provides the `CachedValidatorsProvider` trait, the `validator_cache` Component field/constructor arg, and the `fetch_active_validators` helper. The local `ActiveValidatorsFn` hook, `active_validators_fn` field, `register_active_validators` method, and duplicate `fetch_active_validators` helper that PR 462 previously introduced are dropped — handlers consult the foundation's trait-backed helper instead. Test plan: - cargo +nightly fmt --all --check - cargo clippy -p pluto-core --all-targets --all-features -- -D warnings - cargo test -p pluto-core --all-features (446 passing; +16 new tests covering sync_committee_contribution happy path / hook error / timeout / no-hook 500; submit_messages fanout, unknown index 400, hook 502, real BLS round-trip, invalid sig rejection; submit_contributions fanout, unknown aggregator 400, invalid outer partial-sig rejection; SyncCommittee domain round-trip) Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- crates/core/src/validatorapi/component.rs | 992 +++++++++++++++++++++- crates/core/src/validatorapi/types.rs | 17 +- 2 files changed, 979 insertions(+), 30 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 29aa3af7..efd98424 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -15,7 +15,10 @@ use pluto_eth2api::{ GetSyncCommitteeDutiesResponse, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, }; -use pluto_eth2util::signing::{self, DomainName, SigningError}; +use pluto_eth2util::{ + helpers::epoch_from_slot, + signing::{self, DomainName, SigningError}, +}; use tokio::time::error::Elapsed; use super::{ @@ -37,10 +40,11 @@ use super::{ use crate::{ dutydb::{Error as DutyDbError, MemDB}, signeddata::{ - SyncContribution, VersionedAggregatedAttestation, + SignedSyncContributionAndProof, SignedSyncMessage, SyncContribution, + SyncContributionAndProof, VersionedAggregatedAttestation, VersionedProposal as UnsignedVersionedProposal, }, - types::{Duty, ParSignedDataSet, PubKey, Signature, SignedData}, + types::{Duty, ParSignedData, ParSignedDataSet, PubKey, Signature, SignedData, SlotNumber}, version, }; @@ -121,6 +125,11 @@ const UPSTREAM_REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// attestation duty has time to flow through the pipeline. const ATTESTATION_DATA_TIMEOUT: Duration = Duration::from_secs(24); +/// Hard deadline for any local duty-await lookup (e.g. the sync committee +/// contribution waiter). Sized identically to [`ATTESTATION_DATA_TIMEOUT`] +/// — both bound a request whose slot may never produce data. +const DUTY_AWAIT_TIMEOUT: Duration = Duration::from_secs(24); + /// Validator API [`Handler`] implementation. /// /// Holds the upstream beacon-node client and the cluster's public-key / @@ -162,10 +171,6 @@ pub struct Component { #[allow(dead_code, reason = "consumed by aggregate_attestation in later PRs")] await_agg_attestation_fn: Option, /// Looks up a sync committee contribution. - #[allow( - dead_code, - reason = "consumed by sync_committee_contribution in later PRs" - )] await_sync_contribution_fn: Option, /// Looks up aggregated signed data for a `(duty, pubkey)`. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] @@ -182,7 +187,6 @@ pub struct Component { /// DV root public keys. Mirrors Go's `c.eth2Cl.ActiveValidators(ctx)`, /// which is itself backed by `app/eth2wrap`'s per-epoch validator /// cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, } @@ -324,6 +328,84 @@ impl Component { })); } + /// Verifies an outer partial signature on a [`ParSignedData`] against + /// this node's share for `root_pubkey`. Centralizes the + /// `message_root` / `signature` derivation so every submit handler + /// emits the same `ApiError` shape. + /// + /// `slot` is consumed to resolve the epoch for the domain lookup. + async fn verify_partial_sig_for( + &self, + par_sig: &ParSignedData, + root_pubkey: &BLSPubKey, + slot: u64, + ) -> Result<(), ApiError> { + if self.insecure_test { + return Ok(()); + } + + // The domain choice is hard-wired to the signed-data wrapper passed + // in. Each handler picks the right wrapper and we map here. + let signed: &dyn SignedData = par_sig.signed_data.as_ref(); + let any_signed = signed as &dyn Any; + let domain_name = if any_signed.is::() { + DomainName::SyncCommittee + } else if any_signed.is::() { + DomainName::ContributionAndProof + } else { + return Err(ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "unsupported signed-data wrapper for verify_partial_sig_for", + )); + }; + + let epoch = epoch_from_slot(&self.eth2_cl, slot).await.map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "could not derive epoch from slot") + .with_source(std::io::Error::other(err.to_string())) + })?; + let message_root = signed.message_root().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not derive signed-data message root", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + let signature = signed.signature().map_err(|err| { + ApiError::new(StatusCode::BAD_REQUEST, "missing partial signature") + .with_source(std::io::Error::other(err.to_string())) + })?; + + self.verify_partial_sig(root_pubkey, domain_name, epoch, message_root, &signature) + .await + .map_err(|err| match err { + VerifyPartialSigError::UnknownPubKey => ApiError::new( + StatusCode::BAD_REQUEST, + "unknown validator public key for partial signature", + ), + VerifyPartialSigError::Signing(inner) => { + ApiError::new(StatusCode::BAD_REQUEST, "invalid partial signature") + .with_source(inner) + } + }) + } + + /// Fans out a validated [`ParSignedDataSet`] to every registered + /// subscriber. Each subscriber receives its own clone (the wrapper + /// stored in `subs` already does the clone-before-fanout, mirroring + /// Go's `Subscribe`). + async fn fanout(&self, duty: &Duty, set: ParSignedDataSet) -> Result<(), ApiError> { + for sub in &self.subs { + sub(duty, &set).await.map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "subscriber failed to process partial signed data", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + } + Ok(()) + } + /// Verifies a partial BLS signature produced by the validator client /// against this node's public share for the given DV root pubkey. /// @@ -333,7 +415,6 @@ impl Component { /// it is processing, then invokes this helper. /// /// Skipped entirely when [`Self::insecure_test`] is set. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] pub async fn verify_partial_sig( &self, root_pubkey: &BLSPubKey, @@ -372,7 +453,6 @@ impl Component { /// underlying error into the client-visible message. Mirrors Go's /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via /// `app/eth2wrap`'s validator cache. - #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators( &self, ) -> Result, ApiError> { @@ -659,23 +739,184 @@ impl Handler for Component { async fn sync_committee_contribution( &self, - _opts: SyncCommitteeContributionOpts, + opts: SyncCommitteeContributionOpts, ) -> Result, ApiError> { - unimplemented!("sync_committee_contribution not yet ported") + // Port of Go `SyncCommitteeContribution` at + // `core/validatorapi/validatorapi.go:948`. Delegates to the registered + // `awaitSyncContributionFunc`, bounded by a hard timeout so a missing + // contribution cannot park the handler indefinitely. + let await_fn = self.await_sync_contribution_fn.as_ref().ok_or_else(|| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "sync committee contribution lookup not registered", + ) + })?; + + let contrib = tokio::time::timeout( + DUTY_AWAIT_TIMEOUT, + await_fn(opts.slot, opts.subcommittee_index, opts.beacon_block_root), + ) + .await + .map_err(|_: Elapsed| { + ApiError::new( + StatusCode::REQUEST_TIMEOUT, + "sync committee contribution not available before deadline", + ) + })? + .map_err(map_hook_dutydb_error)?; + + Ok(EthResponse { + data: contrib.0, + execution_optimistic: false, + finalized: false, + dependent_root: None, + }) } async fn submit_sync_committee_contributions( &self, - _contributions: Vec, + contributions: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_contributions not yet ported") + // Port of Go `SubmitSyncCommitteeContributions` at + // `core/validatorapi/validatorapi.go:1009`. Verifies the inner + // selection proof against the root pubkey, the outer partial + // signature against this node's share, groups by slot, and fans + // out to every subscriber. + let vals = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for contrib in contributions { + let slot = contrib.message.contribution.slot; + let v_idx = contrib.message.aggregator_index; + + let eth2_pubkey = vals.get(&v_idx).copied().ok_or_else(|| { + // Mirrors Go's `errors.New("validator not found")` — + // the VC submitted a contribution whose aggregator index + // is not part of the active validator set. + ApiError::new(StatusCode::BAD_REQUEST, "validator not found") + })?; + + let pk = PubKey::try_from(eth2_pubkey.as_slice()).map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid validator public key", + ) + .with_source(std::io::Error::other(format!("{err:?}"))) + })?; + + // Inner selection-proof verification. Mirrors Go's + // `core.VerifyEth2SignedData(... NewSyncContributionAndProof ...)` + // — checked against the **root** pubkey (`eth2Pubkey`), not the + // share, because the VC builds the selection proof with the + // root-level secret. Skipped in `insecure_test`. + if !self.insecure_test { + let inner = SyncContributionAndProof::new(contrib.message.clone()); + let epoch = epoch_from_slot(&self.eth2_cl, slot).await.map_err(|err| { + ApiError::new( + StatusCode::BAD_GATEWAY, + "could not derive epoch for sync contribution", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + let message_root = inner.message_root().map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "could not derive sync selection proof root", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + let signature = inner.signature().map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "missing sync selection proof signature", + ) + .with_source(std::io::Error::other(err.to_string())) + })?; + signing::verify( + &self.eth2_cl, + DomainName::SyncCommitteeSelectionProof, + epoch, + message_root, + &signature, + ð2_pubkey, + ) + .await + .map_err(|err| { + ApiError::new( + StatusCode::BAD_REQUEST, + "invalid sync committee selection proof", + ) + .with_source(err) + })?; + } + + // Outer partial signature: verify against this node's share, + // then stash in the per-slot ParSignedDataSet. + let par_sig_data = + SignedSyncContributionAndProof::new_partial(contrib.clone(), self.share_idx); + + self.verify_partial_sig_for(&par_sig_data, ð2_pubkey, slot) + .await?; + + psigs_by_slot + .entry(slot) + .or_default() + .insert(pk, par_sig_data); + } + + for (slot, set) in psigs_by_slot { + let duty = Duty::new_sync_contribution_duty(SlotNumber::new(slot)); + self.fanout(&duty, set).await?; + } + + Ok(()) } async fn submit_sync_committee_messages( &self, - _messages: Vec, + messages: Vec, ) -> Result<(), ApiError> { - unimplemented!("submit_sync_committee_messages not yet ported") + // Port of Go `SubmitSyncCommitteeMessages` at + // `core/validatorapi/validatorapi.go:958`. Builds a partial + // `SignedSyncMessage` per validator, verifies the partial sig + // against this node's share, then fans out grouped by slot. + let vals = self.fetch_active_validators().await?; + + let mut psigs_by_slot: HashMap = HashMap::new(); + for msg in messages { + let slot = msg.slot; + let v_idx = msg.validator_index; + + let eth2_pubkey = vals + .get(&v_idx) + .copied() + .ok_or_else(|| ApiError::new(StatusCode::BAD_REQUEST, "validator not found"))?; + + let pk = PubKey::try_from(eth2_pubkey.as_slice()).map_err(|err| { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "invalid validator public key", + ) + .with_source(std::io::Error::other(format!("{err:?}"))) + })?; + + let par_sig_data = SignedSyncMessage::new_partial(msg, self.share_idx); + + self.verify_partial_sig_for(&par_sig_data, ð2_pubkey, slot) + .await?; + + psigs_by_slot + .entry(slot) + .or_default() + .insert(pk, par_sig_data); + } + + for (slot, set) in psigs_by_slot { + let duty = Duty::new_sync_message_duty(SlotNumber::new(slot)); + self.fanout(&duty, set).await?; + } + + Ok(()) } } @@ -754,6 +995,36 @@ fn map_dutydb_error(err: DutyDbError) -> ApiError { ApiError::new(status, message).with_source(err) } +/// Maps a hook-returned [`CallbackError`] (used by handlers that delegate +/// through `register_await_*` instead of calling `dutydb` directly) into the +/// `ApiError` returned to the client. If the boxed error is a typed +/// [`DutyDbError`] we recover the same status mapping as [`map_dutydb_error`]. +/// Otherwise we mirror Charon's `writeError` (core/validatorapi/router.go:1674) +/// and surface a generic 500 — Charon does the same when the hook bubbles a +/// non-`apiError` value. +fn map_hook_dutydb_error(err: CallbackError) -> ApiError { + if let Some(dutydb_err) = err.downcast_ref::() { + let (status, message) = match dutydb_err { + DutyDbError::Shutdown => (StatusCode::SERVICE_UNAVAILABLE, "dutydb is shutting down"), + DutyDbError::AwaitDutyExpired => ( + StatusCode::REQUEST_TIMEOUT, + "duty expired before data was stored", + ), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "registered hook returned a dutydb error", + ), + }; + ApiError::new(status, message).with_source(std::io::Error::other(err.to_string())) + } else { + ApiError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "registered hook returned an error", + ) + .with_source(std::io::Error::other(err.to_string())) + } +} + /// Rewrites each duty's root public key to this node's public share. Duties /// whose pubkey is not in `pub_share_by_pubkey` are passed through unchanged /// (the upstream returns all proposers for the epoch, not just ours). @@ -838,6 +1109,12 @@ mod tests { use chrono::{DateTime, Utc}; use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; + use pluto_eth2api::spec::altair::{ + ContributionAndProof, SignedContributionAndProof as AltairSignedContributionAndProof, + SyncCommitteeContribution as AltairSyncCommitteeContribution, + SyncCommitteeMessage as AltairSyncCommitteeMessage, + }; + use pluto_ssz::BitVector; use pluto_testutil::BeaconMock; use serde_json::json; use tokio::sync::mpsc; @@ -853,7 +1130,9 @@ mod tests { }, testutils::random_core_pub_key, types::{Duty, DutyType, PubKey, SlotNumber}, - validatorapi::types::AttestationDataOpts, + validatorapi::types::{ + AttestationDataOpts, SyncCommitteeContributionOpts, SyncCommitteeMessage, + }, }; /// In-memory stand-in for the per-epoch validator cache. Tests supply @@ -1525,13 +1804,21 @@ mod tests { /// Mirrors signing-fixture spec from `pluto_eth2util::signing` tests so /// `verify_partial_sig` can resolve a real beacon-attester domain. + /// Each fork has a distinct epoch so `resolve_fork_version` is + /// deterministic (the fork_schedule HashMap iteration order does not + /// affect the result). fn signing_spec_fixture() -> serde_json::Value { json!({ + "SECONDS_PER_SLOT": "12", + "SLOTS_PER_EPOCH": "16", "DOMAIN_BEACON_PROPOSER": "0x00000000", "DOMAIN_BEACON_ATTESTER": "0x01000000", "DOMAIN_RANDAO": "0x02000000", "DOMAIN_VOLUNTARY_EXIT": "0x04000000", "DOMAIN_APPLICATION_BUILDER": "0x00000001", + "DOMAIN_SYNC_COMMITTEE": "0x07000000", + "DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF": "0x08000000", + "DOMAIN_CONTRIBUTION_AND_PROOF": "0x09000000", "ALTAIR_FORK_VERSION": "0x01020304", "ALTAIR_FORK_EPOCH": "10", "BELLATRIX_FORK_VERSION": "0x02030405", @@ -1737,4 +2024,675 @@ mod tests { assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); assert_eq!(err.message, "active validators lookup failed"); } + + // ==================================================================== + // PR-5 — sync committee contribution + submit handlers + // ==================================================================== + + /// Channel-shaped capture buffer for subscribed fanout invocations. + type CapturedFanout = Arc>>; + + /// Builds an insecure (skip-partial-verify) component that resolves + /// sync-committee contributions via the supplied `await` closure and + /// active validators via the supplied map. + fn make_sync_component( + active: HashMap, + ) -> (CapturedFanout, Component) { + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-tests", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let mut component = + Component::new_insecure(eth2_cl, dutydb, 7, TestValidatorCache::arc(active)); + + let captured: CapturedFanout = Arc::new(tokio::sync::Mutex::new(Vec::new())); + { + let captured = Arc::clone(&captured); + component.subscribe(move |duty, set| { + let captured = Arc::clone(&captured); + async move { + captured.lock().await.push((duty, set)); + Ok(()) + } + }); + } + + (captured, component) + } + + fn dummy_sync_message(slot: u64, validator_index: u64) -> AltairSyncCommitteeMessage { + AltairSyncCommitteeMessage { + slot, + beacon_block_root: [0x10; 32], + validator_index, + signature: [0x20; 96], + } + } + + fn dummy_sync_contribution( + slot: u64, + subcommittee_index: u64, + ) -> AltairSyncCommitteeContribution { + AltairSyncCommitteeContribution { + slot, + beacon_block_root: [0x30; 32], + subcommittee_index, + aggregation_bits: BitVector::<128>::with_bits(&[0]), + signature: [0x40; 96], + } + } + + fn dummy_signed_contribution_and_proof( + slot: u64, + aggregator_index: u64, + subcommittee_index: u64, + ) -> AltairSignedContributionAndProof { + AltairSignedContributionAndProof { + message: ContributionAndProof { + aggregator_index, + contribution: dummy_sync_contribution(slot, subcommittee_index), + selection_proof: [0x50; 96], + }, + signature: [0x60; 96], + } + } + + /// `sync_committee_contribution` happy path: a registered hook resolves + /// the request and the wrapped `EthResponse` carries the inner data. + #[tokio::test] + async fn sync_committee_contribution_returns_data_from_hook() { + let (_captured, mut component) = make_sync_component(HashMap::new()); + + let expected = dummy_sync_contribution(99, 3); + let payload = expected.clone(); + component.register_await_sync_contribution(move |_slot, _sub, _root| { + let payload = payload.clone(); + async move { Ok(SyncContribution(payload)) } + }); + + let response = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 99, + subcommittee_index: 3, + beacon_block_root: [0xAB; 32], + }) + .await + .unwrap(); + + assert_eq!(response.data, expected); + } + + /// `sync_committee_contribution` returns 500 when the registered hook + /// fails with a generic (non-`DutyDbError`) error. Mirrors Charon's + /// `writeError` (core/validatorapi/router.go:1674) which maps any + /// non-`apiError` to 500. The 408 branch is reserved for `Elapsed` + /// (handler-level timeout) and for typed `DutyDbError::AwaitDutyExpired`. + #[tokio::test] + async fn sync_committee_contribution_returns_500_on_generic_hook_error() { + let (_captured, mut component) = make_sync_component(HashMap::new()); + + component.register_await_sync_contribution(|_slot, _sub, _root| async { + Err::("not available".into()) + }); + + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 0, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// `sync_committee_contribution` returns 408 when the registered hook + /// bubbles a typed `DutyDbError::AwaitDutyExpired` — same shape as + /// `attestation_data`'s `map_dutydb_error` so an evicted duty is + /// distinguishable from a hung pipeline. + #[tokio::test] + async fn sync_committee_contribution_returns_408_on_dutydb_await_expired() { + let (_captured, mut component) = make_sync_component(HashMap::new()); + + component.register_await_sync_contribution(|_slot, _sub, _root| async { + Err::(Box::new(DutyDbError::AwaitDutyExpired) as CallbackError) + }); + + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 0, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// `sync_committee_contribution` returns 503 when the registered hook + /// bubbles a typed `DutyDbError::Shutdown` — matches `map_dutydb_error` + /// so a shutting-down dutydb is visible to the VC as Service Unavailable + /// (retryable) rather than 408 (which suggests transient timeout only). + #[tokio::test] + async fn sync_committee_contribution_returns_503_on_dutydb_shutdown() { + let (_captured, mut component) = make_sync_component(HashMap::new()); + + component.register_await_sync_contribution(|_slot, _sub, _root| async { + Err::(Box::new(DutyDbError::Shutdown) as CallbackError) + }); + + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 0, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::SERVICE_UNAVAILABLE); + } + + /// `sync_committee_contribution` returns 408 when the hook never + /// resolves — verifies the hard timeout fires instead of hanging. + #[tokio::test(start_paused = true)] + async fn sync_committee_contribution_times_out_when_hook_never_resolves() { + let (_captured, mut component) = make_sync_component(HashMap::new()); + + component.register_await_sync_contribution(|_slot, _sub, _root| async { + // Park forever. + std::future::pending::>().await + }); + + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 0, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::REQUEST_TIMEOUT); + } + + /// `sync_committee_contribution` returns 500 when no hook is + /// registered. Distinguishes "missing wiring" from "hook errored". + #[tokio::test] + async fn sync_committee_contribution_500_when_no_hook_registered() { + let (_captured, component) = make_sync_component(HashMap::new()); + let err = component + .sync_committee_contribution(SyncCommitteeContributionOpts { + slot: 0, + subcommittee_index: 0, + beacon_block_root: [0; 32], + }) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::INTERNAL_SERVER_ERROR); + } + + /// `submit_sync_committee_messages` happy path: insecure mode skips + /// verify, set is grouped by slot and fanned out to subscribers. + #[tokio::test] + async fn submit_sync_committee_messages_groups_by_slot_and_fanouts() { + let pk_a = [0xAA_u8; 48]; + let pk_b = [0xBB_u8; 48]; + let pk_c = [0xCC_u8; 48]; + let active: HashMap = HashMap::from([ + (1, pk_a), // slot 10 + (2, pk_b), // slot 10 + (3, pk_c), // slot 11 + ]); + + let (captured, component) = make_sync_component(active); + + let messages: Vec = vec![ + dummy_sync_message(10, 1), + dummy_sync_message(10, 2), + dummy_sync_message(11, 3), + ]; + + component + .submit_sync_committee_messages(messages) + .await + .unwrap(); + + let captured = captured.lock().await; + assert_eq!(captured.len(), 2, "two slots → two fanout invocations"); + let mut by_slot: HashMap = HashMap::new(); + for (duty, set) in captured.iter() { + assert_eq!(duty.duty_type, crate::types::DutyType::SyncMessage); + by_slot.insert(duty.slot.inner(), set.inner().len()); + } + assert_eq!(by_slot.get(&10), Some(&2)); + assert_eq!(by_slot.get(&11), Some(&1)); + } + + /// `submit_sync_committee_messages` rejects with 400 when the + /// validator-index lookup misses. Mirrors Go's + /// `errors.New("validator not found")`. + #[tokio::test] + async fn submit_sync_committee_messages_rejects_unknown_validator_index() { + let (_captured, component) = make_sync_component(HashMap::new()); + let err = component + .submit_sync_committee_messages(vec![dummy_sync_message(5, 99)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// `submit_sync_committee_messages` propagates a 502 when the + /// active-validators cache errors. Mirrors Go's bubble-through of + /// `c.eth2Cl.ActiveValidators` errors. + #[tokio::test] + async fn submit_sync_committee_messages_502_on_active_validators_error() { + struct FailingCache; + + #[async_trait] + impl CachedValidatorsProvider for FailingCache { + async fn active_validators( + &self, + ) -> Result< + HashMap, + super::super::validator_cache::CachedValidatorsError, + > { + Err("upstream down".into()) + } + } + + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-active-validator-error", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = + Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); + let component = Component::new_insecure(eth2_cl, dutydb, 1, Arc::new(FailingCache)); + + let err = component + .submit_sync_committee_messages(vec![dummy_sync_message(1, 1)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_GATEWAY); + } + + /// `submit_sync_committee_contributions` happy path: insecure mode + /// skips both the inner selection-proof verify and the outer partial + /// verify, set is grouped by slot and fanned out. + #[tokio::test] + async fn submit_sync_committee_contributions_groups_by_slot_and_fanouts() { + let pk_a = [0xAA_u8; 48]; + let pk_b = [0xBB_u8; 48]; + let active: HashMap = HashMap::from([(10, pk_a), (11, pk_b)]); + + let (captured, component) = make_sync_component(active); + + let contributions = vec![ + dummy_signed_contribution_and_proof(20, 10, 1), + dummy_signed_contribution_and_proof(20, 11, 2), + dummy_signed_contribution_and_proof(21, 10, 1), + ]; + + component + .submit_sync_committee_contributions(contributions) + .await + .unwrap(); + + let captured = captured.lock().await; + assert_eq!(captured.len(), 2); + let mut by_slot: HashMap = HashMap::new(); + for (duty, set) in captured.iter() { + assert_eq!(duty.duty_type, crate::types::DutyType::SyncContribution); + by_slot.insert(duty.slot.inner(), set.inner().len()); + } + assert_eq!(by_slot.get(&20), Some(&2)); + assert_eq!(by_slot.get(&21), Some(&1)); + } + + /// `submit_sync_committee_contributions` rejects with 400 when the + /// aggregator's `validator_index` is not in the active set. + #[tokio::test] + async fn submit_sync_committee_contributions_rejects_unknown_aggregator() { + let (_captured, component) = make_sync_component(HashMap::new()); + let err = component + .submit_sync_committee_contributions(vec![dummy_signed_contribution_and_proof( + 1, 42, 0, + )]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + assert!(err.message.contains("validator not found")); + } + + /// `submit_sync_committee_messages` rejects with 400 when verification + /// runs (i.e. `insecure_test = false`) and the share map has no entry + /// for the validator's root pubkey. Mirrors Go's + /// `getVerifyShareFunc -> errors.New("unknown public key")` path — + /// surfaced to the client as a 400. Confirms `verify_partial_sig_for` + /// is actually invoked from the submit handler. + #[tokio::test] + async fn submit_sync_committee_messages_rejects_invalid_partial_sig() { + let dv_root = [0xEE_u8; 48]; + let mock = mock_beacon_for_signing().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-submit-reject", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + // Empty share map: lookup for `dv_root` will return + // `VerifyPartialSigError::UnknownPubKey`, which the handler maps + // to 400. + let active: HashMap = HashMap::from([(7, dv_root)]); + let component = Component::new( + eth2_cl, + dutydb, + 1, + HashMap::new(), + false, + TestValidatorCache::arc(active), + ); + + let err = component + .submit_sync_committee_messages(vec![dummy_sync_message(1, 7)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `submit_sync_committee_messages` happy path on a real beacon mock: + /// confirms that even with `insecure_test = false`, a correctly signed + /// share passes the outer partial-sig verify and the set fans out. + #[tokio::test] + async fn submit_sync_committee_messages_accepts_valid_partial_sig() { + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + let dv_root = [0x77_u8; 48]; + + let slot: u64 = 1; + let beacon_block_root: Root = [0xDD; 32]; + + let mock = mock_beacon_for_signing().await; + // Resolve the same signing root the handler will compute (epoch=0 + // since slot/SLOTS_PER_EPOCH=1/16=0). + let signing_root = pluto_eth2util::signing::get_data_root( + mock.client(), + DomainName::SyncCommittee, + 0, + beacon_block_root, + ) + .await + .unwrap(); + let signature = BlstImpl.sign(&secret, &signing_root).unwrap(); + + let map = HashMap::from([(dv_root, pubshare)]); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-submit-accept", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let active: HashMap = HashMap::from([(7, dv_root)]); + let mut component = Component::new( + eth2_cl, + dutydb, + 1, + map, + false, + TestValidatorCache::arc(active), + ); + let captured: Arc> = Arc::new(tokio::sync::Mutex::new(0)); + { + let captured = Arc::clone(&captured); + component.subscribe(move |_duty, _set| { + let captured = Arc::clone(&captured); + async move { + *captured.lock().await += 1; + Ok(()) + } + }); + } + + let msg = AltairSyncCommitteeMessage { + slot, + beacon_block_root, + validator_index: 7, + signature, + }; + component + .submit_sync_committee_messages(vec![msg]) + .await + .expect("valid partial sig is accepted"); + + assert_eq!(*captured.lock().await, 1); + } + + /// Round-trips a real BLS signature through `verify_partial_sig` for the + /// SyncCommittee domain. Confirms the default-spec beacon mock resolves + /// DOMAIN_SYNC_COMMITTEE correctly and that signing & verify agree on + /// the signing root. + #[tokio::test] + async fn verify_partial_sig_round_trips_sync_committee_domain() { + let secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let pubshare = BlstImpl.secret_to_public_key(&secret).unwrap(); + let dv_root = [0xAB_u8; 48]; + let map = HashMap::from([(dv_root, pubshare)]); + + let mock = mock_beacon_for_signing().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-roundtrip", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); + + let message_root: Root = [0xCD; 32]; + let signing_root = pluto_eth2util::signing::get_data_root( + mock.client(), + DomainName::SyncCommittee, + 0, + message_root, + ) + .await + .unwrap(); + let signature = BlstImpl.sign(&secret, &signing_root).unwrap(); + + component + .verify_partial_sig( + &dv_root, + DomainName::SyncCommittee, + 0, + message_root, + &signature, + ) + .await + .expect("valid SyncCommittee partial sig should verify"); + } + + /// `submit_sync_committee_contributions` rejects with 400 when the + /// outer partial-sig verify path runs (insecure_test=false) and the + /// share map has no entry for the aggregator's root pubkey. Confirms + /// `verify_partial_sig_for` is reached for the contribution path too. + #[tokio::test] + async fn submit_sync_committee_contributions_rejects_invalid_partial_sig() { + let dv_root = [0xCD_u8; 48]; + let mock = mock_beacon_for_signing().await; + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-contrib-reject", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + // `insecure_test = false` but no share registered for `dv_root`. The + // inner selection-proof verify runs first; because the selection + // proof is a zero-byte signature here it will be rejected with 400 + // via `signing::verify` returning `ZeroSignature` / `VerifyFailed`. + let active: HashMap = HashMap::from([(7, dv_root)]); + let component = Component::new( + eth2_cl, + dutydb, + 1, + HashMap::new(), + false, + TestValidatorCache::arc(active), + ); + + // The dummy fixture's `selection_proof` is `[0x50; 96]` — a random + // non-zero garbage signature, so `signing::verify` returns + // `VerifyFailed`, which we map to 400. + let err = component + .submit_sync_committee_contributions(vec![dummy_signed_contribution_and_proof(1, 7, 0)]) + .await + .unwrap_err(); + assert_eq!(err.status_code, StatusCode::BAD_REQUEST); + } + + /// `submit_sync_committee_contributions` happy path with + /// `insecure_test=false`: build a real-BLS-signed + /// `SignedContributionAndProof` where the **inner** selection proof is + /// signed by the root secret under `SyncCommitteeSelectionProof` and + /// the **outer** partial signature is signed by the share secret under + /// `ContributionAndProof`. Mirrors Charon's + /// `TestComponent_SubmitSyncCommitteeContributionsVerify` and proves + /// both verify steps agree on domain / epoch / message-root with the + /// shared mock-beacon spec fixture. + #[tokio::test] + async fn submit_sync_committee_contributions_accepts_valid_partial_sig() { + // Root secret signs the inner selection proof; share secret signs + // the outer partial sig. Both pubkeys are derived from the BLS + // secret keys and wired through the per-validator share map. + let root_secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let root_pubkey = BlstImpl.secret_to_public_key(&root_secret).unwrap(); + let share_secret = BlstImpl + .generate_insecure_secret(rand::rngs::OsRng) + .unwrap(); + let share_pubkey = BlstImpl.secret_to_public_key(&share_secret).unwrap(); + + let slot: u64 = 1; + let subcommittee_index: u64 = 3; + let aggregator_index: u64 = 11; + + let mock = mock_beacon_for_signing().await; + + // Inner: sign HTR(SyncAggregatorSelectionData) with the root secret + // under DomainName::SyncCommitteeSelectionProof. + let contribution = AltairSyncCommitteeContribution { + slot, + beacon_block_root: [0xEE; 32], + subcommittee_index, + aggregation_bits: BitVector::<128>::with_bits(&[0]), + signature: [0; 96], + }; + let selection_proof_root = ContributionAndProof { + aggregator_index, + contribution: contribution.clone(), + selection_proof: [0; 96], + } + .selection_proof_message_root(); + let selection_proof_signing_root = pluto_eth2util::signing::get_data_root( + mock.client(), + DomainName::SyncCommitteeSelectionProof, + 0, + selection_proof_root, + ) + .await + .unwrap(); + let selection_proof = BlstImpl + .sign(&root_secret, &selection_proof_signing_root) + .unwrap(); + + // Outer: sign HTR(ContributionAndProof) — including the just-computed + // selection_proof — with the share secret under + // DomainName::ContributionAndProof. + let message = ContributionAndProof { + aggregator_index, + contribution, + selection_proof, + }; + let outer_root = AltairSignedContributionAndProof { + message: message.clone(), + signature: [0; 96], + } + .message_root(); + let outer_signing_root = pluto_eth2util::signing::get_data_root( + mock.client(), + DomainName::ContributionAndProof, + 0, + outer_root, + ) + .await + .unwrap(); + let outer_signature = BlstImpl.sign(&share_secret, &outer_signing_root).unwrap(); + + let map = HashMap::from([(root_pubkey, share_pubkey)]); + let cancel = CancellationToken::new(); + let (deadliner, _deadliner_rx) = DeadlinerTask::start( + cancel.clone(), + "validatorapi-sync-contrib-accept", + FarFutureCalculator, + ); + let (_evict_tx, evict_rx) = mpsc::channel(1); + let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); + let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); + let active: HashMap = + HashMap::from([(aggregator_index, root_pubkey)]); + let mut component = Component::new( + eth2_cl, + dutydb, + 1, + map, + false, + TestValidatorCache::arc(active), + ); + let captured: Arc> = Arc::new(tokio::sync::Mutex::new(0)); + { + let captured = Arc::clone(&captured); + component.subscribe(move |_duty, _set| { + let captured = Arc::clone(&captured); + async move { + *captured.lock().await += 1; + Ok(()) + } + }); + } + + let signed = AltairSignedContributionAndProof { + message, + signature: outer_signature, + }; + component + .submit_sync_committee_contributions(vec![signed]) + .await + .expect("valid inner + outer signatures are accepted"); + + assert_eq!(*captured.lock().await, 1); + } } diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 8e18456a..eb455fbe 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -21,7 +21,10 @@ pub use pluto_eth2api::{ GetSyncCommitteeDutiesResponseResponseDatum as SyncCommitteeDuty, GetVersionResponseResponse as NodeVersionResponse, GetVersionResponseResponseData as NodeVersionData, - spec::phase0::{self, Epoch, Root, Slot, ValidatorIndex}, + spec::{ + altair::{SignedContributionAndProof, SyncCommitteeContribution, SyncCommitteeMessage}, + phase0::{self, Epoch, Root, Slot, ValidatorIndex}, + }, }; /// Attestation data alias for the consensus-spec phase0 type. @@ -171,18 +174,6 @@ pub struct SignedValidatorRegistration {} #[derive(Debug, Clone)] pub struct SignedVoluntaryExit {} -/// Sync-committee message payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeMessage {} - -/// Sync-committee contribution payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SyncCommitteeContribution {} - -/// Signed contribution-and-proof payload. Placeholder. -#[derive(Debug, Clone)] -pub struct SignedContributionAndProof {} - /// Beacon-committee selection payload. Placeholder. #[derive(Debug, Clone)] pub struct BeaconCommitteeSelection {} From f9c968c5b5ca7891ae2712ac2e563a4e88538475 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 11 Jun 2026 20:07:29 +0200 Subject: [PATCH 3/4] feat(core): wire eth2api CachedValidatorsProvider into validatorapi Make eth2api's existing CachedValidatorsProvider trait async (#[async_trait]) and implement it for ValidatorCache, add ActiveValidators::new so consumers can build instances, and add the async-trait dependency. Wire a validator_cache: Arc into the validatorapi Component (constructors + fetch_active_validators), reusing the shared trait rather than a validatorapi-local duplicate. fetch_active_validators mirrors Go's eth2Cl.ActiveValidators. Add a TestValidatorCache test double and update existing validatorapi tests for the new constructor arity. This is the base the validatorapi submit-handler PRs (selections, exit/ registrations, sync-submit) stack on. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- Cargo.lock | 1 + crates/core/src/validatorapi/component.rs | 87 +++++++++++++++++++++-- crates/eth2api/Cargo.toml | 1 + crates/eth2api/src/valcache.rs | 31 +++++++- 4 files changed, 110 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be2ddc88..abd7cd39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5707,6 +5707,7 @@ version = "1.7.1" dependencies = [ "alloy", "anyhow", + "async-trait", "bon", "chrono", "ethereum_ssz", diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index f9f7729e..60a3c4e3 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -14,6 +14,7 @@ use pluto_eth2api::{ GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, + valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; @@ -140,6 +141,12 @@ const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(24); pub struct Component { /// Upstream beacon-node API client. eth2_cl: Arc, + /// Per-epoch active-validators cache. Submit handlers consult this to + /// translate a validator-client-supplied `validator_index` into the + /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, + /// which is itself backed by the beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation /// data) produced by the rest of the pipeline. dutydb: Arc, @@ -196,6 +203,7 @@ impl Component { share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -203,6 +211,7 @@ impl Component { share_idx, pub_share_by_pubkey, builder_enabled, + validator_cache, insecure_test: false, subs: Vec::new(), await_proposal_fn: None, @@ -223,6 +232,7 @@ impl Component { eth2_cl: Arc, dutydb: Arc, share_idx: u64, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -230,6 +240,7 @@ impl Component { share_idx, pub_share_by_pubkey: HashMap::new(), builder_enabled: false, + validator_cache, insecure_test: true, subs: Vec::new(), await_proposal_fn: None, @@ -241,6 +252,25 @@ impl Component { } } + /// Returns the cluster's active validators (`validator_index -> DV root + /// public key`) from the registered [`CachedValidatorsProvider`], + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the + /// beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + async fn fetch_active_validators(&self) -> Result { + tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.validator_cache.active_validators(), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("active validators"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed") + .with_source(err) + }) + } + /// Appends a subscriber that is invoked by submit endpoints once a /// partial-signed-data set has been validated. The registered closure /// receives its own clone of the set, so subscribers can mutate without @@ -1321,6 +1351,39 @@ mod tests { types::{Duty, DutyDefinition, DutyType, PubKey, SlotNumber}, validatorapi::types::AttestationDataOpts, }; + use pluto_eth2api::valcache::{CompleteValidators, ValidatorCacheError}; + + /// In-memory [`CachedValidatorsProvider`] for tests. Holds a fixed + /// `validator_index -> DV root pubkey` map. `complete_validators` is not + /// consumed by the validator API, so it returns an empty set. + #[derive(Default)] + pub(super) struct TestValidatorCache(HashMap); + + impl TestValidatorCache { + /// An empty cache as an `Arc`. + pub(super) fn empty() -> Arc { + Arc::new(Self::default()) + } + + /// A cache pre-populated with `validators`. + #[allow(dead_code, reason = "consumed by submit_* handler tests in later PRs")] + pub(super) fn arc( + validators: HashMap, + ) -> Arc { + Arc::new(Self(validators)) + } + } + + #[async_trait] + impl CachedValidatorsProvider for TestValidatorCache { + async fn active_validators(&self) -> Result { + Ok(ActiveValidators::new(self.0.clone())) + } + + async fn complete_validators(&self) -> Result { + Ok(CompleteValidators::default()) + } + } /// Schedules every duty with a deadline at `MAX_UTC`, so duties are /// `Scheduled` but never naturally expire. @@ -1345,7 +1408,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, dutydb) } @@ -1587,7 +1651,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); // Start an await before any data is stored. let waiter = { @@ -1773,7 +1838,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - Component::new(eth2_cl, dutydb, 1, map, false) + Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()) } /// `Subscribe` invokes every registered subscriber, each receiving its @@ -2004,7 +2069,7 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new(eth2_cl, dutydb, 1, map, false); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); (component, mock) } @@ -2087,7 +2152,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, dutydb, 1); + let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty()); component .verify_partial_sig( @@ -2163,7 +2228,8 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, mock) } @@ -2812,7 +2878,14 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let mut component = Component::new(eth2_cl, Arc::clone(&dutydb), 1, HashMap::new(), false); + let mut component = Component::new( + eth2_cl, + Arc::clone(&dutydb), + 1, + HashMap::new(), + false, + TestValidatorCache::empty(), + ); let (consensus, signed) = matched_phase0_proposals(33, 5); diff --git a/crates/eth2api/Cargo.toml b/crates/eth2api/Cargo.toml index 99b0a3c9..fbc5c298 100644 --- a/crates/eth2api/Cargo.toml +++ b/crates/eth2api/Cargo.toml @@ -13,6 +13,7 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"] [dependencies] anyhow.workspace = true +async-trait.workspace = true bon.workspace = true http.workspace = true oas3-gen-support.workspace = true diff --git a/crates/eth2api/src/valcache.rs b/crates/eth2api/src/valcache.rs index ea52ef6d..6e086680 100644 --- a/crates/eth2api/src/valcache.rs +++ b/crates/eth2api/src/valcache.rs @@ -4,6 +4,7 @@ use crate::{ PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, spec::phase0::{BLSPubKey as PubKey, ValidatorIndex}, }; +use async_trait::async_trait; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; @@ -42,6 +43,13 @@ impl std::ops::Deref for CompleteValidators { } impl ActiveValidators { + /// Builds an [`ActiveValidators`] from a `validator_index -> pubkey` map. + /// Lets consumers outside this crate (e.g. test doubles of + /// [`CachedValidatorsProvider`]) construct populated instances. + pub fn new(validators: HashMap) -> Self { + Self(validators) + } + /// An [`Iterator`] of active validator indices. pub fn indices(&self) -> impl Iterator + '_ { self.0.keys().copied() @@ -55,12 +63,29 @@ impl ActiveValidators { /// A provider of cached validator information for the current epoch, /// including both active validators and complete validator data. -pub trait CachedValidatorsProvider { +/// +/// Async so implementations may populate the underlying cache on demand — +/// callers must not assume the call is non-blocking. Consumed via +/// `Arc` (e.g. by the validator API), so the +/// trait is object-safe and `Send + Sync`. +#[async_trait] +pub trait CachedValidatorsProvider: Send + Sync { /// Get the cached active validators. - fn active_validators(&self) -> Result; + async fn active_validators(&self) -> Result; /// Get all the cached validators. - fn complete_validators(&self) -> Result; + async fn complete_validators(&self) -> Result; +} + +#[async_trait] +impl CachedValidatorsProvider for ValidatorCache { + async fn active_validators(&self) -> Result { + Ok(self.get_by_head().await?.0) + } + + async fn complete_validators(&self) -> Result { + Ok(self.get_by_head().await?.1) + } } /// A cache for active validators. From 915faa3ee02dab86eaf30bfe69bafc527516dbee Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Fri, 12 Jun 2026 17:35:34 +0200 Subject: [PATCH 4/4] docs(core): drop Go/Charon cross-reference comments from sync committee handlers Remove "port of"/"mirrors Go's"/"matches Charon" annotations and similar AI-flavored narration from the validatorapi sync committee contribution and submit code and tests, keeping the behavioural descriptions. Co-authored-by: varex83 --- crates/core/src/validatorapi/component.rs | 82 +++++++++-------------- crates/core/src/validatorapi/types.rs | 3 +- 2 files changed, 32 insertions(+), 53 deletions(-) diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index 8d78a91e..42c322cb 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -157,8 +157,7 @@ pub struct Component { eth2_cl: Arc, /// Per-epoch active-validators cache. Submit handlers consult this to /// translate a validator-client-supplied `validator_index` into the - /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, - /// which is itself backed by the beacon-node validator cache. + /// cluster's DV root public key. Backed by the beacon-node validator cache. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation @@ -264,9 +263,7 @@ impl Component { /// Returns the cluster's active validators (`validator_index -> DV root /// public key`) from the registered [`CachedValidatorsProvider`], - /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's - /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the - /// beacon-node validator cache. + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] async fn fetch_active_validators(&self) -> Result { tokio::time::timeout( @@ -425,8 +422,7 @@ impl Component { /// Fans out a validated [`ParSignedDataSet`] to every registered /// subscriber. Each subscriber receives its own clone (the wrapper - /// stored in `subs` already does the clone-before-fanout, mirroring - /// Go's `Subscribe`). + /// stored in `subs` already does the clone-before-fanout). async fn fanout(&self, duty: &Duty, set: ParSignedDataSet) -> Result<(), ApiError> { for sub in &self.subs { sub(duty, &set).await.map_err(|err| { @@ -989,10 +985,9 @@ impl Handler for Component { &self, opts: SyncCommitteeContributionOpts, ) -> Result, ApiError> { - // Port of Go `SyncCommitteeContribution` at - // `core/validatorapi/validatorapi.go:948`. Delegates to the registered - // `awaitSyncContributionFunc`, bounded by a hard timeout so a missing - // contribution cannot park the handler indefinitely. + // Delegates to the registered sync-contribution hook, bounded by a + // hard timeout so a missing contribution cannot park the handler + // indefinitely. let await_fn = self.await_sync_contribution_fn.as_ref().ok_or_else(|| { ApiError::new( StatusCode::INTERNAL_SERVER_ERROR, @@ -1026,11 +1021,9 @@ impl Handler for Component { &self, contributions: Vec, ) -> Result<(), ApiError> { - // Port of Go `SubmitSyncCommitteeContributions` at - // `core/validatorapi/validatorapi.go:1009`. Verifies the inner - // selection proof against the root pubkey, the outer partial - // signature against this node's share, groups by slot, and fans - // out to every subscriber. + // Verifies the inner selection proof against the root pubkey, the + // outer partial signature against this node's share, groups by slot, + // and fans out to every subscriber. let vals = self.fetch_active_validators().await?; let mut psigs_by_slot: HashMap = HashMap::new(); @@ -1039,9 +1032,8 @@ impl Handler for Component { let v_idx = contrib.message.aggregator_index; let eth2_pubkey = vals.get(&v_idx).copied().ok_or_else(|| { - // Mirrors Go's `errors.New("validator not found")` — - // the VC submitted a contribution whose aggregator index - // is not part of the active validator set. + // The VC submitted a contribution whose aggregator index is + // not part of the active validator set. ApiError::new(StatusCode::BAD_REQUEST, "validator not found") })?; @@ -1053,11 +1045,10 @@ impl Handler for Component { .with_source(std::io::Error::other(format!("{err:?}"))) })?; - // Inner selection-proof verification. Mirrors Go's - // `core.VerifyEth2SignedData(... NewSyncContributionAndProof ...)` - // — checked against the **root** pubkey (`eth2Pubkey`), not the - // share, because the VC builds the selection proof with the - // root-level secret. Skipped in `insecure_test`. + // Inner selection-proof verification — checked against the + // **root** pubkey (`eth2Pubkey`), not the share, because the VC + // builds the selection proof with the root-level secret. Skipped + // in `insecure_test`. if !self.insecure_test { let inner = SyncContributionAndProof::new(contrib.message.clone()); let epoch = epoch_from_slot(&self.eth2_cl, slot).await.map_err(|err| { @@ -1126,10 +1117,8 @@ impl Handler for Component { &self, messages: Vec, ) -> Result<(), ApiError> { - // Port of Go `SubmitSyncCommitteeMessages` at - // `core/validatorapi/validatorapi.go:958`. Builds a partial - // `SignedSyncMessage` per validator, verifies the partial sig - // against this node's share, then fans out grouped by slot. + // Builds a partial `SignedSyncMessage` per validator, verifies the + // partial sig against this node's share, then fans out grouped by slot. let vals = self.fetch_active_validators().await?; let mut psigs_by_slot: HashMap = HashMap::new(); @@ -1258,9 +1247,7 @@ fn map_dutydb_error(err: DutyDbError) -> ApiError { /// through `register_await_*` instead of calling `dutydb` directly) into the /// `ApiError` returned to the client. If the boxed error is a typed /// [`DutyDbError`] we recover the same status mapping as [`map_dutydb_error`]. -/// Otherwise we mirror Charon's `writeError` (core/validatorapi/router.go:1674) -/// and surface a generic 500 — Charon does the same when the hook bubbles a -/// non-`apiError` value. +/// Otherwise we surface a generic 500 when the hook bubbles an untyped value. fn map_hook_dutydb_error(err: CallbackError) -> ApiError { if let Some(dutydb_err) = err.downcast_ref::() { let (status, message) = match dutydb_err { @@ -2308,8 +2295,8 @@ mod tests { assert!(component.subs.is_empty()); } - /// Mirrors signing-fixture spec from `pluto_eth2util::signing` tests so - /// `verify_partial_sig` can resolve a real beacon-attester domain. + /// Uses the same signing-fixture spec as the `pluto_eth2util::signing` + /// tests so `verify_partial_sig` can resolve a real beacon-attester domain. /// Each fork has a distinct epoch so `resolve_fork_version` is /// deterministic (the fork_schedule HashMap iteration order does not /// affect the result). @@ -2465,8 +2452,7 @@ mod tests { // ==================================================================== /// `fetch_active_validators` returns whatever the registered - /// `CachedValidatorsProvider` yields, untouched. Mirrors Go's - /// `c.eth2Cl.ActiveValidators(ctx)` return shape. + /// `CachedValidatorsProvider` yields, untouched. #[tokio::test] async fn fetch_active_validators_returns_cache_contents() { let cancel = CancellationToken::new(); @@ -2638,10 +2624,9 @@ mod tests { } /// `sync_committee_contribution` returns 500 when the registered hook - /// fails with a generic (non-`DutyDbError`) error. Mirrors Charon's - /// `writeError` (core/validatorapi/router.go:1674) which maps any - /// non-`apiError` to 500. The 408 branch is reserved for `Elapsed` - /// (handler-level timeout) and for typed `DutyDbError::AwaitDutyExpired`. + /// fails with a generic (non-`DutyDbError`) error. The 408 branch is + /// reserved for `Elapsed` (handler-level timeout) and for typed + /// `DutyDbError::AwaitDutyExpired`. #[tokio::test] async fn sync_committee_contribution_returns_500_on_generic_hook_error() { let (_captured, mut component) = make_sync_component(HashMap::new()); @@ -2783,8 +2768,7 @@ mod tests { } /// `submit_sync_committee_messages` rejects with 400 when the - /// validator-index lookup misses. Mirrors Go's - /// `errors.New("validator not found")`. + /// validator-index lookup misses. #[tokio::test] async fn submit_sync_committee_messages_rejects_unknown_validator_index() { let (_captured, component) = make_sync_component(HashMap::new()); @@ -2797,8 +2781,7 @@ mod tests { } /// `submit_sync_committee_messages` propagates a 502 when the - /// active-validators cache errors. Mirrors Go's bubble-through of - /// `c.eth2Cl.ActiveValidators` errors. + /// active-validators cache errors. #[tokio::test] async fn submit_sync_committee_messages_502_on_active_validators_error() { struct FailingCache; @@ -2887,10 +2870,9 @@ mod tests { /// `submit_sync_committee_messages` rejects with 400 when verification /// runs (i.e. `insecure_test = false`) and the share map has no entry - /// for the validator's root pubkey. Mirrors Go's - /// `getVerifyShareFunc -> errors.New("unknown public key")` path — - /// surfaced to the client as a 400. Confirms `verify_partial_sig_for` - /// is actually invoked from the submit handler. + /// for the validator's root pubkey — an unknown public key is surfaced to + /// the client as a 400. Confirms `verify_partial_sig_for` is actually + /// invoked from the submit handler. #[tokio::test] async fn submit_sync_committee_messages_rejects_invalid_partial_sig() { let dv_root = [0xEE_u8; 48]; @@ -3090,10 +3072,8 @@ mod tests { /// `SignedContributionAndProof` where the **inner** selection proof is /// signed by the root secret under `SyncCommitteeSelectionProof` and /// the **outer** partial signature is signed by the share secret under - /// `ContributionAndProof`. Mirrors Charon's - /// `TestComponent_SubmitSyncCommitteeContributionsVerify` and proves - /// both verify steps agree on domain / epoch / message-root with the - /// shared mock-beacon spec fixture. + /// `ContributionAndProof`. Proves both verify steps agree on domain / + /// epoch / message-root with the shared mock-beacon spec fixture. #[tokio::test] async fn submit_sync_committee_contributions_accepts_valid_partial_sig() { // Root secret signs the inner selection proof; share secret signs diff --git a/crates/core/src/validatorapi/types.rs b/crates/core/src/validatorapi/types.rs index 84db2776..8e7f1cf8 100644 --- a/crates/core/src/validatorapi/types.rs +++ b/crates/core/src/validatorapi/types.rs @@ -153,8 +153,7 @@ pub use crate::signeddata::VersionedProposal; pub use crate::signeddata::VersionedSignedProposal; /// Versioned signed blinded proposal payload — alias of the eth2api versioned -/// wrapper, the same shape consumed by Go's -/// `SubmitBlindedProposalOpts.Proposal`. +/// wrapper. pub use pluto_eth2api::versioned::VersionedSignedBlindedProposal; /// Versioned attestation payload. Placeholder.