From f9c968c5b5ca7891ae2712ac2e563a4e88538475 Mon Sep 17 00:00:00 2001 From: Bohdan Ohorodnii <273991985+varex83agent@users.noreply.github.com> Date: Thu, 11 Jun 2026 20:07:29 +0200 Subject: [PATCH] feat(core): wire eth2api CachedValidatorsProvider into validatorapi Make eth2api's existing CachedValidatorsProvider trait async (#[async_trait]) and implement it for ValidatorCache, add ActiveValidators::new so consumers can build instances, and add the async-trait dependency. Wire a validator_cache: Arc into the validatorapi Component (constructors + fetch_active_validators), reusing the shared trait rather than a validatorapi-local duplicate. fetch_active_validators mirrors Go's eth2Cl.ActiveValidators. Add a TestValidatorCache test double and update existing validatorapi tests for the new constructor arity. This is the base the validatorapi submit-handler PRs (selections, exit/ registrations, sync-submit) stack on. Co-Authored-By: Bohdan Ohorodnii <35969035+varex83@users.noreply.github.com> --- Cargo.lock | 1 + crates/core/src/validatorapi/component.rs | 87 +++++++++++++++++++++-- crates/eth2api/Cargo.toml | 1 + crates/eth2api/src/valcache.rs | 31 +++++++- 4 files changed, 110 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be2ddc88..abd7cd39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5707,6 +5707,7 @@ version = "1.7.1" dependencies = [ "alloy", "anyhow", + "async-trait", "bon", "chrono", "ethereum_ssz", diff --git a/crates/core/src/validatorapi/component.rs b/crates/core/src/validatorapi/component.rs index f9f7729e..60a3c4e3 100644 --- a/crates/core/src/validatorapi/component.rs +++ b/crates/core/src/validatorapi/component.rs @@ -14,6 +14,7 @@ use pluto_eth2api::{ GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest, GetSyncCommitteeDutiesResponse, spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex}, + valcache::{ActiveValidators, CachedValidatorsProvider}, versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock}, }; use pluto_eth2util::signing::{self, DomainName, SigningError}; @@ -140,6 +141,12 @@ const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(24); pub struct Component { /// Upstream beacon-node API client. eth2_cl: Arc, + /// Per-epoch active-validators cache. Submit handlers consult this to + /// translate a validator-client-supplied `validator_index` into the + /// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`, + /// which is itself backed by the beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + validator_cache: Arc, /// In-memory DutyDB used to await consensus output (e.g. attestation /// data) produced by the rest of the pipeline. dutydb: Arc, @@ -196,6 +203,7 @@ impl Component { share_idx: u64, pub_share_by_pubkey: HashMap, builder_enabled: bool, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -203,6 +211,7 @@ impl Component { share_idx, pub_share_by_pubkey, builder_enabled, + validator_cache, insecure_test: false, subs: Vec::new(), await_proposal_fn: None, @@ -223,6 +232,7 @@ impl Component { eth2_cl: Arc, dutydb: Arc, share_idx: u64, + validator_cache: Arc, ) -> Self { Self { eth2_cl, @@ -230,6 +240,7 @@ impl Component { share_idx, pub_share_by_pubkey: HashMap::new(), builder_enabled: false, + validator_cache, insecure_test: true, subs: Vec::new(), await_proposal_fn: None, @@ -241,6 +252,25 @@ impl Component { } } + /// Returns the cluster's active validators (`validator_index -> DV root + /// public key`) from the registered [`CachedValidatorsProvider`], + /// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's + /// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the + /// beacon-node validator cache. + #[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")] + async fn fetch_active_validators(&self) -> Result { + tokio::time::timeout( + UPSTREAM_REQUEST_TIMEOUT, + self.validator_cache.active_validators(), + ) + .await + .map_err(|_: Elapsed| upstream_timeout("active validators"))? + .map_err(|err| { + ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed") + .with_source(err) + }) + } + /// Appends a subscriber that is invoked by submit endpoints once a /// partial-signed-data set has been validated. The registered closure /// receives its own clone of the set, so subscribers can mutate without @@ -1321,6 +1351,39 @@ mod tests { types::{Duty, DutyDefinition, DutyType, PubKey, SlotNumber}, validatorapi::types::AttestationDataOpts, }; + use pluto_eth2api::valcache::{CompleteValidators, ValidatorCacheError}; + + /// In-memory [`CachedValidatorsProvider`] for tests. Holds a fixed + /// `validator_index -> DV root pubkey` map. `complete_validators` is not + /// consumed by the validator API, so it returns an empty set. + #[derive(Default)] + pub(super) struct TestValidatorCache(HashMap); + + impl TestValidatorCache { + /// An empty cache as an `Arc`. + pub(super) fn empty() -> Arc { + Arc::new(Self::default()) + } + + /// A cache pre-populated with `validators`. + #[allow(dead_code, reason = "consumed by submit_* handler tests in later PRs")] + pub(super) fn arc( + validators: HashMap, + ) -> Arc { + Arc::new(Self(validators)) + } + } + + #[async_trait] + impl CachedValidatorsProvider for TestValidatorCache { + async fn active_validators(&self) -> Result { + Ok(ActiveValidators::new(self.0.clone())) + } + + async fn complete_validators(&self) -> Result { + Ok(CompleteValidators::default()) + } + } /// Schedules every duty with a deadline at `MAX_UTC`, so duties are /// `Scheduled` but never naturally expire. @@ -1345,7 +1408,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, dutydb) } @@ -1587,7 +1651,8 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); // Start an await before any data is stored. let waiter = { @@ -1773,7 +1838,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - Component::new(eth2_cl, dutydb, 1, map, false) + Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()) } /// `Subscribe` invokes every registered subscriber, each receiving its @@ -2004,7 +2069,7 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new(eth2_cl, dutydb, 1, map, false); + let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty()); (component, mock) } @@ -2087,7 +2152,7 @@ mod tests { let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap()); - let component = Component::new_insecure(eth2_cl, dutydb, 1); + let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty()); component .verify_partial_sig( @@ -2163,7 +2228,8 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1); + let component = + Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty()); (component, mock) } @@ -2812,7 +2878,14 @@ mod tests { let (_evict_tx, evict_rx) = mpsc::channel(1); let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel)); let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap()); - let mut component = Component::new(eth2_cl, Arc::clone(&dutydb), 1, HashMap::new(), false); + let mut component = Component::new( + eth2_cl, + Arc::clone(&dutydb), + 1, + HashMap::new(), + false, + TestValidatorCache::empty(), + ); let (consensus, signed) = matched_phase0_proposals(33, 5); diff --git a/crates/eth2api/Cargo.toml b/crates/eth2api/Cargo.toml index 99b0a3c9..fbc5c298 100644 --- a/crates/eth2api/Cargo.toml +++ b/crates/eth2api/Cargo.toml @@ -13,6 +13,7 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"] [dependencies] anyhow.workspace = true +async-trait.workspace = true bon.workspace = true http.workspace = true oas3-gen-support.workspace = true diff --git a/crates/eth2api/src/valcache.rs b/crates/eth2api/src/valcache.rs index ea52ef6d..6e086680 100644 --- a/crates/eth2api/src/valcache.rs +++ b/crates/eth2api/src/valcache.rs @@ -4,6 +4,7 @@ use crate::{ PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody, spec::phase0::{BLSPubKey as PubKey, ValidatorIndex}, }; +use async_trait::async_trait; use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; @@ -42,6 +43,13 @@ impl std::ops::Deref for CompleteValidators { } impl ActiveValidators { + /// Builds an [`ActiveValidators`] from a `validator_index -> pubkey` map. + /// Lets consumers outside this crate (e.g. test doubles of + /// [`CachedValidatorsProvider`]) construct populated instances. + pub fn new(validators: HashMap) -> Self { + Self(validators) + } + /// An [`Iterator`] of active validator indices. pub fn indices(&self) -> impl Iterator + '_ { self.0.keys().copied() @@ -55,12 +63,29 @@ impl ActiveValidators { /// A provider of cached validator information for the current epoch, /// including both active validators and complete validator data. -pub trait CachedValidatorsProvider { +/// +/// Async so implementations may populate the underlying cache on demand — +/// callers must not assume the call is non-blocking. Consumed via +/// `Arc` (e.g. by the validator API), so the +/// trait is object-safe and `Send + Sync`. +#[async_trait] +pub trait CachedValidatorsProvider: Send + Sync { /// Get the cached active validators. - fn active_validators(&self) -> Result; + async fn active_validators(&self) -> Result; /// Get all the cached validators. - fn complete_validators(&self) -> Result; + async fn complete_validators(&self) -> Result; +} + +#[async_trait] +impl CachedValidatorsProvider for ValidatorCache { + async fn active_validators(&self) -> Result { + Ok(self.get_by_head().await?.0) + } + + async fn complete_validators(&self) -> Result { + Ok(self.get_by_head().await?.1) + } } /// A cache for active validators.