Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

87 changes: 80 additions & 7 deletions crates/core/src/validatorapi/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use pluto_eth2api::{
GetProposerDutiesRequest, GetProposerDutiesResponse, GetSyncCommitteeDutiesRequest,
GetSyncCommitteeDutiesResponse,
spec::phase0::{BLSPubKey, Epoch, Root, ValidatorIndex},
valcache::{ActiveValidators, CachedValidatorsProvider},
versioned::{DataVersion, SignedBlindedProposalBlock, SignedProposalBlock},
};
use pluto_eth2util::signing::{self, DomainName, SigningError};
Expand Down Expand Up @@ -140,6 +141,12 @@ const PROPOSAL_TIMEOUT: Duration = Duration::from_secs(24);
pub struct Component {
/// Upstream beacon-node API client.
eth2_cl: Arc<EthBeaconNodeApiClient>,
/// Per-epoch active-validators cache. Submit handlers consult this to
/// translate a validator-client-supplied `validator_index` into the
/// cluster's DV root public key. Mirrors Go's `eth2Cl.ActiveValidators`,
/// which is itself backed by the beacon-node validator cache.
#[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")]
validator_cache: Arc<dyn CachedValidatorsProvider>,
/// In-memory DutyDB used to await consensus output (e.g. attestation
/// data) produced by the rest of the pipeline.
dutydb: Arc<MemDB>,
Expand Down Expand Up @@ -196,13 +203,15 @@ impl Component {
share_idx: u64,
pub_share_by_pubkey: HashMap<BLSPubKey, BLSPubKey>,
builder_enabled: bool,
validator_cache: Arc<dyn CachedValidatorsProvider>,
) -> Self {
Self {
eth2_cl,
dutydb,
share_idx,
pub_share_by_pubkey,
builder_enabled,
validator_cache,
insecure_test: false,
subs: Vec::new(),
await_proposal_fn: None,
Expand All @@ -223,13 +232,15 @@ impl Component {
eth2_cl: Arc<EthBeaconNodeApiClient>,
dutydb: Arc<MemDB>,
share_idx: u64,
validator_cache: Arc<dyn CachedValidatorsProvider>,
) -> Self {
Self {
eth2_cl,
dutydb,
share_idx,
pub_share_by_pubkey: HashMap::new(),
builder_enabled: false,
validator_cache,
insecure_test: true,
subs: Vec::new(),
await_proposal_fn: None,
Expand All @@ -241,6 +252,25 @@ impl Component {
}
}

/// Returns the cluster's active validators (`validator_index -> DV root
/// public key`) from the registered [`CachedValidatorsProvider`],
/// bounded by [`UPSTREAM_REQUEST_TIMEOUT`]. Mirrors Go's
/// `c.eth2Cl.ActiveValidators(ctx)`, which is itself implemented via the
/// beacon-node validator cache.
#[allow(dead_code, reason = "consumed by submit_* handlers in later PRs")]
async fn fetch_active_validators(&self) -> Result<ActiveValidators, ApiError> {
tokio::time::timeout(
UPSTREAM_REQUEST_TIMEOUT,
self.validator_cache.active_validators(),
)
.await
.map_err(|_: Elapsed| upstream_timeout("active validators"))?
.map_err(|err| {
ApiError::new(StatusCode::BAD_GATEWAY, "active validators lookup failed")
.with_source(err)
})
}

/// Appends a subscriber that is invoked by submit endpoints once a
/// partial-signed-data set has been validated. The registered closure
/// receives its own clone of the set, so subscribers can mutate without
Expand Down Expand Up @@ -1321,6 +1351,39 @@ mod tests {
types::{Duty, DutyDefinition, DutyType, PubKey, SlotNumber},
validatorapi::types::AttestationDataOpts,
};
use pluto_eth2api::valcache::{CompleteValidators, ValidatorCacheError};

/// In-memory [`CachedValidatorsProvider`] for tests. Holds a fixed
/// `validator_index -> DV root pubkey` map. `complete_validators` is not
/// consumed by the validator API, so it returns an empty set.
#[derive(Default)]
pub(super) struct TestValidatorCache(HashMap<ValidatorIndex, BLSPubKey>);

impl TestValidatorCache {
/// An empty cache as an `Arc<dyn CachedValidatorsProvider>`.
pub(super) fn empty() -> Arc<dyn CachedValidatorsProvider> {
Arc::new(Self::default())
}

/// A cache pre-populated with `validators`.
#[allow(dead_code, reason = "consumed by submit_* handler tests in later PRs")]
pub(super) fn arc(
validators: HashMap<ValidatorIndex, BLSPubKey>,
) -> Arc<dyn CachedValidatorsProvider> {
Arc::new(Self(validators))
}
}

#[async_trait]
impl CachedValidatorsProvider for TestValidatorCache {
async fn active_validators(&self) -> Result<ActiveValidators, ValidatorCacheError> {
Ok(ActiveValidators::new(self.0.clone()))
}

async fn complete_validators(&self) -> Result<CompleteValidators, ValidatorCacheError> {
Ok(CompleteValidators::default())
}
}

/// Schedules every duty with a deadline at `MAX_UTC`, so duties are
/// `Scheduled` but never naturally expire.
Expand All @@ -1345,7 +1408,8 @@ mod tests {
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl =
Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap());
let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1);
let component =
Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty());
(component, dutydb)
}

Expand Down Expand Up @@ -1587,7 +1651,8 @@ mod tests {
let dutydb = Arc::new(MemDB::new(deadliner, trim_rx, &cancel));
let eth2_cl =
Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap());
let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1);
let component =
Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty());

// Start an await before any data is stored.
let waiter = {
Expand Down Expand Up @@ -1773,7 +1838,7 @@ mod tests {
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl =
Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap());
Component::new(eth2_cl, dutydb, 1, map, false)
Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty())
}

/// `Subscribe` invokes every registered subscriber, each receiving its
Expand Down Expand Up @@ -2004,7 +2069,7 @@ mod tests {
let (_evict_tx, evict_rx) = mpsc::channel(1);
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap());
let component = Component::new(eth2_cl, dutydb, 1, map, false);
let component = Component::new(eth2_cl, dutydb, 1, map, false, TestValidatorCache::empty());
(component, mock)
}

Expand Down Expand Up @@ -2087,7 +2152,7 @@ mod tests {
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl =
Arc::new(EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:0").unwrap());
let component = Component::new_insecure(eth2_cl, dutydb, 1);
let component = Component::new_insecure(eth2_cl, dutydb, 1, TestValidatorCache::empty());

component
.verify_partial_sig(
Expand Down Expand Up @@ -2163,7 +2228,8 @@ mod tests {
let (_evict_tx, evict_rx) = mpsc::channel(1);
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap());
let component = Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1);
let component =
Component::new_insecure(eth2_cl, Arc::clone(&dutydb), 1, TestValidatorCache::empty());
(component, mock)
}

Expand Down Expand Up @@ -2812,7 +2878,14 @@ mod tests {
let (_evict_tx, evict_rx) = mpsc::channel(1);
let dutydb = Arc::new(MemDB::new(deadliner, evict_rx, &cancel));
let eth2_cl = Arc::new(EthBeaconNodeApiClient::with_base_url(mock.uri()).unwrap());
let mut component = Component::new(eth2_cl, Arc::clone(&dutydb), 1, HashMap::new(), false);
let mut component = Component::new(
eth2_cl,
Arc::clone(&dutydb),
1,
HashMap::new(),
false,
TestValidatorCache::empty(),
);

let (consensus, signed) = matched_phase0_proposals(33, 5);

Expand Down
1 change: 1 addition & 0 deletions crates/eth2api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ignored = ["bon", "http", "oas3-gen-support", "regex", "reqwest", "validator"]

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
bon.workspace = true
http.workspace = true
oas3-gen-support.workspace = true
Expand Down
31 changes: 28 additions & 3 deletions crates/eth2api/src/valcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
PostStateValidatorsRequestPath, PostStateValidatorsResponse, ValidatorRequestBody,
spec::phase0::{BLSPubKey as PubKey, ValidatorIndex},
};
use async_trait::async_trait;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -42,6 +43,13 @@ impl std::ops::Deref for CompleteValidators {
}

impl ActiveValidators {
/// Builds an [`ActiveValidators`] from a `validator_index -> pubkey` map.
/// Lets consumers outside this crate (e.g. test doubles of
/// [`CachedValidatorsProvider`]) construct populated instances.
pub fn new(validators: HashMap<ValidatorIndex, PubKey>) -> Self {
Self(validators)
}

/// An [`Iterator`] of active validator indices.
pub fn indices(&self) -> impl Iterator<Item = ValidatorIndex> + '_ {
self.0.keys().copied()
Expand All @@ -55,12 +63,29 @@ impl ActiveValidators {

/// A provider of cached validator information for the current epoch,
/// including both active validators and complete validator data.
pub trait CachedValidatorsProvider {
///
/// Async so implementations may populate the underlying cache on demand —
/// callers must not assume the call is non-blocking. Consumed via
/// `Arc<dyn CachedValidatorsProvider>` (e.g. by the validator API), so the
/// trait is object-safe and `Send + Sync`.
#[async_trait]
pub trait CachedValidatorsProvider: Send + Sync {
/// Get the cached active validators.
fn active_validators(&self) -> Result<ActiveValidators>;
async fn active_validators(&self) -> Result<ActiveValidators>;

/// Get all the cached validators.
fn complete_validators(&self) -> Result<CompleteValidators>;
async fn complete_validators(&self) -> Result<CompleteValidators>;
}

#[async_trait]
impl CachedValidatorsProvider for ValidatorCache {
async fn active_validators(&self) -> Result<ActiveValidators> {
Ok(self.get_by_head().await?.0)
}

async fn complete_validators(&self) -> Result<CompleteValidators> {
Ok(self.get_by_head().await?.1)
}
}

/// A cache for active validators.
Expand Down
Loading