diff --git a/crates/core/src/dutydb/memory.rs b/crates/core/src/dutydb/memory.rs index 01a68b86..a0497221 100644 --- a/crates/core/src/dutydb/memory.rs +++ b/crates/core/src/dutydb/memory.rs @@ -18,7 +18,7 @@ use crate::{ signeddata::{ AttestationData, SyncContribution, VersionedAggregatedAttestation, VersionedProposal, }, - types::{Duty, DutyType, PubKey}, + types::{Duty, DutyType, PubKey, UnsignedDataSet, UnsignedDutyData}, }; /// Error type for DutyDB operations. @@ -129,23 +129,6 @@ pub enum Error { /// Result type for DutyDB operations. pub type Result = std::result::Result; -/// Unsigned duty data variant — matches Go's `core.UnsignedData` interface. -#[derive(Debug, Clone)] -pub enum UnsignedDutyData { - /// Unsigned proposal (DutyProposer). - Proposal(Box), - /// Unsigned attestation data (DutyAttester). - Attestation(AttestationData), - /// Unsigned aggregated attestation (DutyAggregator). - AggAttestation(VersionedAggregatedAttestation), - /// Unsigned sync contribution (DutySyncContribution). - SyncContribution(SyncContribution), -} - -/// Map from public key to unsigned duty data, equivalent to Go's -/// `core.UnsignedDataSet`. -pub type UnsignedDataSet = HashMap; - /// Lookup key for attestation data: (slot, committee index). #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] struct AttKey { diff --git a/crates/core/src/dutydb/mod.rs b/crates/core/src/dutydb/mod.rs index c96e9ddf..0d9d1478 100644 --- a/crates/core/src/dutydb/mod.rs +++ b/crates/core/src/dutydb/mod.rs @@ -2,4 +2,8 @@ pub mod memory; -pub use memory::{Error, MemDB, UnsignedDataSet, UnsignedDutyData}; +pub use memory::{Error, MemDB}; + +// `UnsignedDataSet`/`UnsignedDutyData` now live in `core::types` (shared with +// the fetcher); re-exported here for backwards compatibility. +pub use crate::types::{UnsignedDataSet, UnsignedDutyData}; diff --git a/crates/core/src/fetcher/graffiti.rs b/crates/core/src/fetcher/graffiti.rs new file mode 100644 index 00000000..a5885a8c --- /dev/null +++ b/crates/core/src/fetcher/graffiti.rs @@ -0,0 +1,349 @@ +//! Graffiti construction for block proposals. +//! +//! Ported from `charon/core/fetcher/graffiti.go`. + +use std::collections::HashMap; + +use pluto_eth2api::{EthBeaconNodeApiClient, GetNodeVersionRequest, GetNodeVersionResponse}; + +use crate::{ + types::PubKey, + version::{VERSION, git_commit}, +}; + +/// Obol token appended to graffiti unless client-append is disabled. +const OBOL_TOKEN: &str = "OB"; + +/// Graffiti is a fixed 32-byte field in the beacon block body. +const GRAFFITI_LEN: usize = 32; + +/// Error returned while constructing a [`GraffitiBuilder`]. +#[derive(Debug, thiserror::Error)] +pub enum GraffitiError { + /// More than one graffiti value was provided but the count did not match + /// the number of validators. + #[error("graffiti length must match the number of validators or be a single value")] + LengthMismatch, +} + +/// Maps beacon node product tokens (the first `/`-separated component of the +/// node version string) to their two-letter graffiti code. +pub fn client_graffiti_mappings() -> HashMap<&'static str, &'static str> { + HashMap::from([ + ("teku", "TK"), + ("Lighthouse", "LH"), + ("Lodestar", "LS"), + ("Prysm", "PY"), + ("Nimbus", "NB"), + ("Grandine", "GD"), + ]) +} + +/// Builds per-validator graffiti used when proposing blocks. +#[derive(Debug, Clone, Default)] +pub struct GraffitiBuilder { + default_graffiti: [u8; GRAFFITI_LEN], + graffiti: HashMap, +} + +impl GraffitiBuilder { + /// Creates a new graffiti builder. + /// + /// `graffiti` may be `None` (every validator gets the default graffiti), a + /// single value (applied to every validator) or one value per validator. + pub async fn new( + pubkeys: &[PubKey], + graffiti: Option<&[String]>, + disable_client_append: bool, + eth2_cl: &EthBeaconNodeApiClient, + ) -> Result { + let default = default_graffiti(); + let mut builder = Self { + default_graffiti: default, + graffiti: HashMap::with_capacity(pubkeys.len()), + }; + + // Handle nil graffiti. + let Some(graffiti) = graffiti else { + for pubkey in pubkeys { + builder.graffiti.insert(*pubkey, default); + } + + return Ok(builder); + }; + + if graffiti.len() > 1 && graffiti.len() != pubkeys.len() { + return Err(GraffitiError::LengthMismatch); + } + + let token = fetch_beacon_node_token(eth2_cl).await; + + // Handle single graffiti case. + if graffiti.len() == 1 { + let single_graffiti = &graffiti[0]; + for pubkey in pubkeys { + builder.graffiti.insert( + *pubkey, + build_graffiti(single_graffiti, &token, disable_client_append), + ); + } + + return Ok(builder); + } + + // Handle multiple graffiti case. + for (idx, pubkey) in pubkeys.iter().enumerate() { + builder.graffiti.insert( + *pubkey, + build_graffiti(&graffiti[idx], &token, disable_client_append), + ); + } + + Ok(builder) + } + + /// Returns the graffiti for a given pubkey, or the default graffiti when + /// the pubkey is unknown. + pub fn get_graffiti(&self, pubkey: &PubKey) -> [u8; GRAFFITI_LEN] { + self.graffiti + .get(pubkey) + .copied() + .unwrap_or(self.default_graffiti) + } +} + +/// Copies `s` into a fixed 32-byte array, truncating or zero-padding to match +/// Go's `copy(graffiti[:], s)` semantics. +fn graffiti_bytes(s: &str) -> [u8; GRAFFITI_LEN] { + let mut out = [0u8; GRAFFITI_LEN]; + let bytes = s.as_bytes(); + let n = bytes.len().min(GRAFFITI_LEN); + out[..n].copy_from_slice(&bytes[..n]); + out +} + +/// Builds the graffiti with optional Obol and beacon node token. +fn build_graffiti(graffiti: &str, token: &str, disable_client_append: bool) -> [u8; GRAFFITI_LEN] { + if disable_client_append { + graffiti_bytes(graffiti) + } else { + graffiti_bytes(&format!("{graffiti}{OBOL_TOKEN}{token}")) + } +} + +/// Returns the default graffiti: `pluto/-`. +fn default_graffiti() -> [u8; GRAFFITI_LEN] { + let (commit_sha, _) = git_commit(); + graffiti_bytes(&format!("pluto/{}-{}", *VERSION, commit_sha)) +} + +/// Queries the beacon node for its product token, returning an empty string on +/// any error or unrecognized client. +async fn fetch_beacon_node_token(eth2_cl: &EthBeaconNodeApiClient) -> String { + let Some(version) = node_version(eth2_cl).await else { + return String::new(); + }; + + let product_token = version.split('/').next().unwrap_or_default(); + + client_graffiti_mappings() + .get(product_token) + .map(|token| (*token).to_string()) + .unwrap_or_default() +} + +/// Fetches the beacon node version string (e.g. `Lighthouse/v0.1.5 (Linux +/// x86_64)`), or `None` on any error. +async fn node_version(eth2_cl: &EthBeaconNodeApiClient) -> Option { + match eth2_cl.get_node_version(GetNodeVersionRequest {}).await { + Ok(GetNodeVersionResponse::Ok(resp)) => Some(resp.data.version), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use pluto_testutil::BeaconMock; + use serde_json::json; + + use super::*; + + /// 48-byte BLS public key length used to build distinct test pubkeys. + const PK_LEN: usize = 48; + + /// Builds a beacon mock whose `/eth/v1/node/version` endpoint returns + /// `version`. + async fn mock_with_version(version: &str) -> BeaconMock { + BeaconMock::builder() + .endpoint_overrides(vec![( + "/eth/v1/node/version".to_string(), + json!({ "data": { "version": version } }), + )]) + .build() + .await + .expect("build mock") + } + + #[tokio::test] + async fn fetch_beacon_node_token() { + // fetch token error: unreachable beacon node yields an empty token. + let unreachable = + EthBeaconNodeApiClient::with_base_url("http://127.0.0.1:1").expect("create client"); + assert_eq!(super::fetch_beacon_node_token(&unreachable).await, ""); + + // fetch token unexpected response: no `/`-separated product token. + let mock = mock_with_version("IncorrectUserAgent").await; + assert_eq!(super::fetch_beacon_node_token(mock.client()).await, ""); + + // fetch token not predicted in map. + let mock = mock_with_version("Dune/v1.3 (Windows)").await; + assert_eq!(super::fetch_beacon_node_token(mock.client()).await, ""); + + // fetch token: Lighthouse maps to "LH". + let mock = mock_with_version("Lighthouse/v0.1.5 (Linux x86_64)").await; + assert_eq!(super::fetch_beacon_node_token(mock.client()).await, "LH"); + } + + #[test] + fn build_graffiti() { + let graffiti = "abcdefghij"; // 10 bytes + let token = "BN"; + + // disable client append. + assert_eq!( + super::build_graffiti(graffiti, token, true), + graffiti_bytes(graffiti) + ); + + // enable client append. + assert_eq!( + super::build_graffiti(graffiti, token, false), + graffiti_bytes(&format!("{graffiti}{OBOL_TOKEN}{token}")) + ); + } + + #[test] + fn default_graffiti() { + let (commit_sha, _) = git_commit(); + let expected = graffiti_bytes(&format!("pluto/{}-{}", *VERSION, commit_sha)); + assert_eq!(super::default_graffiti(), expected); + } + + #[test] + fn get_graffiti() { + let pubkeys = [ + PubKey::new([1u8; PK_LEN]), + PubKey::new([2u8; PK_LEN]), + PubKey::new([3u8; PK_LEN]), + ]; + + let mut g0 = [0u8; GRAFFITI_LEN]; + g0[0] = 1; + let mut g1 = [0u8; GRAFFITI_LEN]; + g1[0] = 2; + + let builder = GraffitiBuilder { + default_graffiti: super::default_graffiti(), + graffiti: HashMap::from([(pubkeys[0], g0), (pubkeys[1], g1)]), + }; + + assert_eq!(builder.get_graffiti(&pubkeys[0]), g0); + assert_eq!(builder.get_graffiti(&pubkeys[1]), g1); + assert_eq!(builder.get_graffiti(&pubkeys[2]), super::default_graffiti()); + } + + #[tokio::test] + async fn new_graffiti_builder() { + let pubkeys = [ + PubKey::new([1u8; PK_LEN]), + PubKey::new([2u8; PK_LEN]), + PubKey::new([3u8; PK_LEN]), + ]; + + // graffiti length greater than pubkeys. + let mock = BeaconMock::builder().build().await.expect("build mock"); + let graffiti = vec![ + "a".repeat(10), + "b".repeat(15), + "c".repeat(20), + "d".repeat(25), + ]; + let result = GraffitiBuilder::new(&pubkeys, Some(&graffiti), false, mock.client()).await; + assert!(matches!(result, Err(GraffitiError::LengthMismatch))); + + // graffiti length lesser than pubkeys. + let graffiti = vec!["a".repeat(10), "b".repeat(15)]; + let result = GraffitiBuilder::new(&pubkeys, Some(&graffiti), false, mock.client()).await; + assert!(matches!(result, Err(GraffitiError::LengthMismatch))); + + // nil graffiti. + let builder = GraffitiBuilder::new(&pubkeys, None, false, mock.client()) + .await + .expect("build builder"); + for pubkey in &pubkeys { + assert_eq!(builder.get_graffiti(pubkey), super::default_graffiti()); + } + + // single graffiti with append (Grandine -> GD). + let mock = mock_with_version("Grandine/v2.1.4 (Linux x86_64)").await; + let graffiti = "x".repeat(GRAFFITI_LEN - OBOL_TOKEN.len() - 2); + let builder = GraffitiBuilder::new( + &pubkeys, + Some(std::slice::from_ref(&graffiti)), + false, + mock.client(), + ) + .await + .expect("build builder"); + let expected = graffiti_bytes(&format!("{graffiti}{OBOL_TOKEN}GD")); + for pubkey in &pubkeys { + assert_eq!(builder.get_graffiti(pubkey), expected); + } + + // single graffiti without append. + let mock = mock_with_version("Teku/v4.2.1 (Linux x86_64)").await; + let graffiti = "y".repeat(GRAFFITI_LEN); + let builder = GraffitiBuilder::new( + &pubkeys, + Some(std::slice::from_ref(&graffiti)), + true, + mock.client(), + ) + .await + .expect("build builder"); + let expected = graffiti_bytes(&graffiti); + for pubkey in &pubkeys { + assert_eq!(builder.get_graffiti(pubkey), expected); + } + + // multiple graffiti with append (Prysm -> PY). + let mock = mock_with_version("Prysm/v0.2.7 (Linux x86_64)").await; + let graffiti = vec![ + "a".repeat(10), + "b".repeat(GRAFFITI_LEN - OBOL_TOKEN.len() - 3), + "c".repeat(GRAFFITI_LEN - OBOL_TOKEN.len() - 4), + ]; + let builder = GraffitiBuilder::new(&pubkeys, Some(&graffiti), false, mock.client()) + .await + .expect("build builder"); + for (idx, pubkey) in pubkeys.iter().enumerate() { + let expected = graffiti_bytes(&format!("{}{OBOL_TOKEN}PY", graffiti[idx])); + assert_eq!(builder.get_graffiti(pubkey), expected); + } + + // multiple graffiti without append (empty version -> empty token). + let mock = mock_with_version("").await; + let graffiti = vec![ + "a".repeat(10), + "b".repeat(GRAFFITI_LEN - OBOL_TOKEN.len()), + "c".repeat(GRAFFITI_LEN - OBOL_TOKEN.len() + 1), + ]; + let builder = GraffitiBuilder::new(&pubkeys, Some(&graffiti), true, mock.client()) + .await + .expect("build builder"); + for (idx, pubkey) in pubkeys.iter().enumerate() { + let expected = graffiti_bytes(&graffiti[idx]); + assert_eq!(builder.get_graffiti(pubkey), expected); + } + } +} diff --git a/crates/core/src/fetcher/mod.rs b/crates/core/src/fetcher/mod.rs new file mode 100644 index 00000000..2e462c38 --- /dev/null +++ b/crates/core/src/fetcher/mod.rs @@ -0,0 +1,1627 @@ +//! Fetcher — fetches unsigned duty data from the beacon node. +//! +//! Ported from `charon/core/fetcher/fetcher.go`. + +mod graffiti; + +pub use graffiti::{GraffitiBuilder, GraffitiError, client_graffiti_mappings}; + +use std::{any::Any, collections::HashMap, future::Future, pin::Pin, sync::Arc}; + +use pluto_eth2api::{ + ConsensusVersion, EthBeaconNodeApiClient, EthBeaconNodeApiClientError, + GetAggregatedAttestationV2Request, GetAggregatedAttestationV2Response, + ProduceAttestationDataRequest, ProduceAttestationDataResponse, ProduceBlockV3Request, + ProduceBlockV3Response, ProduceBlockV3ResponseResponse, + ProduceSyncCommitteeContributionRequest, ProduceSyncCommitteeContributionResponse, + spec::{altair, phase0}, + versioned, +}; +use pluto_eth2util::eth2exp::{self, Eth2ExpError}; +use tracing::{debug, info, warn}; +use tree_hash::TreeHash; + +use crate::{ + signeddata::{ + AttestationData, BeaconCommitteeSelection, ProposalBlock, SignedSyncMessage, + SyncCommitteeSelection, SyncContribution, VersionedAggregatedAttestation, + VersionedProposal, + }, + types::{ + Duty, DutyDefinitionSet, DutyType, PubKey, SignedData, UnsignedDataSet, UnsignedDutyData, + }, +}; + +/// Boxed error returned by injected callbacks (subscribers, AggSigDB, DutyDB). +type BoxError = Box; + +/// Future returned by an injected callback. +type CallbackFuture = Pin> + Send>>; + +/// Subscriber callback invoked for each fetched duty data set. +pub type Subscriber = Arc CallbackFuture<()> + Send + Sync>; + +/// AggSigDB callback: resolves aggregated signed data for a duty/pubkey. +pub type AggSigDbFunc = + Arc CallbackFuture> + Send + Sync>; + +/// DutyDB callback: resolves attestation data for a `(slot, committee index)`. +pub type AwaitAttDataFunc = + Arc CallbackFuture + Send + Sync>; + +/// Fee recipient resolver: returns the configured fee recipient for a pubkey. +pub type FeeRecipientFunc = Arc String + Send + Sync>; + +/// Errors returned while fetching duty data. +#[derive(Debug, thiserror::Error)] +pub enum FetcherError { + /// Wraps an inner error with the duty-type context, matching Go's + /// `errors.Wrap(err, "fetch data")`. + #[error("{context}: {source}")] + Fetch { + /// Context prefix (e.g. `fetch attester data`). + context: &'static str, + /// Wrapped inner error. + source: Box, + }, + + /// `DutyBuilderProposer` is deprecated and no longer supported. + #[error("DutyBuilderProposer is deprecated and no longer supported")] + DeprecatedDutyBuilderProposer, + + /// The duty type is not supported by the fetcher. + #[error("unsupported duty type: {0}")] + UnsupportedDutyType(String), + + /// A duty definition was not an attester definition. + #[error("invalid attester definition")] + InvalidAttesterDefinition, + + /// AggSigDB returned a value that was not a beacon committee selection. + #[error("invalid beacon committee selection")] + InvalidBeaconCommitteeSelection, + + /// AggSigDB returned a value that was not a sync committee selection. + #[error("invalid sync committee selection")] + InvalidSyncCommitteeSelection, + + /// AggSigDB returned a value that was not a sync committee message. + #[error("invalid sync committee message")] + InvalidSyncCommitteeMessage, + + /// The beacon node returned a nil attestation data response. + #[error("attestation data cannot be nil")] + NilAttestationData, + + /// The beacon node could not find an aggregate attestation for the root. + #[error("aggregate attestation not found by root (retryable)")] + AggregateAttestationNotFound, + + /// The beacon node could not find a sync committee contribution. + #[error("sync committee contribution not found by root (retryable)")] + SyncContributionNotFound, + + /// The beacon node returned an unexpected (non-success) response. + #[error("unexpected beacon node response")] + UnexpectedResponse, + + /// AggSigDB / DutyDB callback (or a subscriber) returned an error. + #[error("{0}")] + Callback(BoxError), + + /// AggSigDB was queried but no resolver was registered. + #[error("AggSigDB function not registered")] + AggSigDbNotRegistered, + + /// DutyDB was queried but no resolver was registered. + #[error("AwaitAttData function not registered")] + AwaitAttDataNotRegistered, + + /// Error from the beacon node API client. + #[error(transparent)] + BeaconNode(#[from] EthBeaconNodeApiClientError), + + /// Error from aggregator selection. + #[error(transparent)] + Eth2Exp(#[from] Eth2ExpError), + + /// JSON (de)serialization error while decoding a beacon node response. + #[error("decode beacon node response: {0}")] + Json(#[from] serde_json::Error), + + /// A versioned proposal had an unsupported fork version. + #[error("unsupported proposal version: {0:?}")] + UnsupportedProposalVersion(ConsensusVersion), + + /// A versioned proposal response was missing the `block` field. + #[error("proposal response missing block field")] + MissingBlockField, + + /// A signed data value could not produce a signature. + #[error("signature: {0}")] + Signature(String), +} + +/// Result alias for fetcher operations. +type Result = std::result::Result; + +/// Fetches proposed duty data from the beacon node. +pub struct Fetcher { + eth2_cl: EthBeaconNodeApiClient, + fee_recipient_func: Option, + subs: Vec, + agg_sig_db_func: Option, + await_att_data_func: Option, + builder_enabled: bool, + graffiti_builder: GraffitiBuilder, + electra_slot: phase0::Slot, + fetch_only_comm_idx0: bool, +} + +impl Fetcher { + /// Returns a new fetcher instance. + pub fn new( + eth2_cl: EthBeaconNodeApiClient, + fee_recipient_func: Option, + builder_enabled: bool, + graffiti_builder: GraffitiBuilder, + electra_slot: phase0::Slot, + fetch_only_comm_idx0: bool, + ) -> Self { + Self { + eth2_cl, + fee_recipient_func, + subs: Vec::new(), + agg_sig_db_func: None, + await_att_data_func: None, + builder_enabled, + graffiti_builder, + electra_slot, + fetch_only_comm_idx0, + } + } + + /// Registers a callback for fetched duties. + /// + /// Note: this is not thread safe and should be called *before* `fetch`. + pub fn subscribe(&mut self, sub: Subscriber) { + self.subs.push(sub); + } + + /// Registers a function to get resolved aggregated signed data from + /// AggSigDB. + /// + /// Note: this is not thread safe and should be called *before* `fetch`. + pub fn register_agg_sig_db(&mut self, func: AggSigDbFunc) { + self.agg_sig_db_func = Some(func); + } + + /// Registers a function to get attestation data from DutyDB. + /// + /// Note: this is not thread safe and should be called *before* `fetch`. + pub fn register_await_att_data(&mut self, func: AwaitAttDataFunc) { + self.await_att_data_func = Some(func); + } + + /// Triggers fetching of a proposed duty data set. + pub async fn fetch(&self, duty: Duty, def_set: DutyDefinitionSet) -> Result<()> { + let slot = duty.slot.inner(); + + let unsigned_set = match duty.duty_type { + DutyType::Proposer => self + .fetch_proposer_data(slot, &def_set) + .await + .map_err(wrap("fetch proposer data"))?, + DutyType::Attester => self + .fetch_attester_data(slot, &def_set) + .await + .map_err(wrap("fetch attester data"))?, + DutyType::BuilderProposer => return Err(FetcherError::DeprecatedDutyBuilderProposer), + DutyType::Aggregator => { + let set = self + .fetch_aggregator_data(slot, &def_set) + .await + .map_err(wrap("fetch aggregator data"))?; + if set.is_empty() { + // No aggregators found in this slot. + return Ok(()); + } + set + } + DutyType::SyncContribution => { + let set = self + .fetch_contribution_data(slot, &def_set) + .await + .map_err(wrap("fetch contribution data"))?; + if set.is_empty() { + // No sync committee contributors found in this slot. + return Ok(()); + } + set + } + other => return Err(FetcherError::UnsupportedDutyType(other.to_string())), + }; + + for sub in &self.subs { + // Clone before calling each subscriber. + let clone = unsigned_set.clone(); + sub(duty.clone(), clone) + .await + .map_err(FetcherError::Callback)?; + } + + Ok(()) + } + + /// Returns the fetched attestation data set for committees and validators + /// in the arg set. + async fn fetch_attester_data( + &self, + slot: u64, + def_set: &DutyDefinitionSet, + ) -> Result { + // We may have multiple validators in the same committee, use the same + // attestation data in that case. + let mut data_by_comm_idx: HashMap = HashMap::new(); + + let mut resp = UnsignedDataSet::new(); + for (pubkey, def) in def_set { + let att_def = def + .as_attester() + .ok_or(FetcherError::InvalidAttesterDefinition)?; + + let mut comm_idx = att_def.duty().committee_index; + + // Attestation data for Electra is not bound by committee index; + // committee index is still persisted in the request but should be + // set to 0 once all VCs request committee index 0. + if slot >= self.electra_slot && self.fetch_only_comm_idx0 { + comm_idx = 0; + } + + let eth2_att_data = match data_by_comm_idx.get(&comm_idx) { + Some(data) => data.clone(), + None => { + let data = self.attestation_data(slot, comm_idx).await?; + data_by_comm_idx.insert(comm_idx, data.clone()); + data + } + }; + + resp.insert( + *pubkey, + UnsignedDutyData::Attestation(AttestationData { + data: eth2_att_data, + duty: att_def.duty().clone(), + }), + ); + } + + Ok(resp) + } + + /// Fetches the attestation aggregation data. + async fn fetch_aggregator_data( + &self, + slot: u64, + def_set: &DutyDefinitionSet, + ) -> Result { + let mut tracker = PubkeysTracker::new("attester aggregation"); + + // We may have multiple aggregators in the same committee, use the same + // aggregated attestation in that case. + let mut agg_att_by_comm_idx: HashMap = HashMap::new(); + + let mut resp = UnsignedDataSet::new(); + for (pubkey, def) in def_set { + let att_def = def + .as_attester() + .ok_or(FetcherError::InvalidAttesterDefinition)?; + + // Query AggSigDB for DutyPrepareAggregator to get beacon committee + // selections. + let prep_agg_data = self + .agg_sig_db(Duty::new_prepare_aggregator_duty(slot.into()), *pubkey) + .await?; + let selection = downcast::(prep_agg_data.as_ref()) + .ok_or(FetcherError::InvalidBeaconCommitteeSelection)?; + + let is_aggregator = eth2exp::is_att_aggregator( + &self.eth2_cl, + att_def.duty().committee_length, + selection.0.selection_proof, + ) + .await?; + if !is_aggregator { + tracker.add_not_selected(pubkey.to_string()); + continue; + } + + tracker.add_resolved(pubkey.to_string()); + + let comm_idx = att_def.duty().committee_index; + + if let Some(agg_att) = agg_att_by_comm_idx.get(&comm_idx) { + resp.insert( + *pubkey, + UnsignedDutyData::AggAttestation(VersionedAggregatedAttestation( + agg_att.clone(), + )), + ); + // Skip querying aggregate attestation for aggregators of the + // same committee. + continue; + } + + // Query DutyDB for attestation data to get the attestation data root. + let att_data = self.await_att_data(slot, comm_idx).await?; + let data_root = att_data.tree_hash_root().0; + + // Query BN for aggregate attestation. + let agg_att = self + .aggregate_attestation(slot, comm_idx, data_root) + .await?; + + agg_att_by_comm_idx.insert(comm_idx, agg_att.clone()); + resp.insert( + *pubkey, + UnsignedDutyData::AggAttestation(VersionedAggregatedAttestation(agg_att)), + ); + } + + tracker.log(); + + Ok(resp) + } + + /// Fetches the block proposal data set. + async fn fetch_proposer_data( + &self, + slot: u64, + def_set: &DutyDefinitionSet, + ) -> Result { + let mut resp = UnsignedDataSet::new(); + for pubkey in def_set.keys() { + // Fetch previously aggregated randao reveal from AggSigDB. + let randao_data = self + .agg_sig_db(Duty::new_randao_duty(slot.into()), *pubkey) + .await?; + let randao = randao_data + .signature() + .map_err(|e| FetcherError::Signature(e.to_string()))?; + + // Maximum priority to builder blocks when the builder is enabled. + let builder_boost_factor: u64 = if self.builder_enabled { u64::MAX } else { 0 }; + + let graffiti = self.graffiti_builder.get_graffiti(pubkey); + + let request = ProduceBlockV3Request::builder() + .slot(slot.to_string()) + .randao_reveal(hex_0x(&randao)) + .graffiti(hex_0x(&graffiti)) + .builder_boost_factor(builder_boost_factor.to_string()) + .build() + .map_err(EthBeaconNodeApiClientError::RequestError)?; + + let response = match self + .eth2_cl + .produce_block_v3(request) + .await + .map_err(EthBeaconNodeApiClientError::RequestError)? + { + ProduceBlockV3Response::Ok(resp) => resp, + _ => return Err(FetcherError::UnexpectedResponse), + }; + + let proposal = versioned_proposal_from_response(&response)?; + + // Builders set the fee recipient to themselves, so it always differs + // from the validator's; only verify when the builder is disabled. + if !self.builder_enabled { + let fee_recipient = self + .fee_recipient_func + .as_ref() + .map(|f| f(pubkey)) + .unwrap_or_default(); + verify_fee_recipient(&proposal, &fee_recipient); + } + + resp.insert(*pubkey, UnsignedDutyData::Proposal(Box::new(proposal))); + } + + Ok(resp) + } + + /// Fetches the sync committee contribution data. + async fn fetch_contribution_data( + &self, + slot: u64, + def_set: &DutyDefinitionSet, + ) -> Result { + let mut tracker = PubkeysTracker::new("sync committee contribution"); + + let mut resp = UnsignedDataSet::new(); + for pubkey in def_set.keys() { + // Query AggSigDB for DutyPrepareSyncContribution to get the sync + // committee selection. + let selection_data = self + .agg_sig_db( + Duty::new_prepare_sync_contribution_duty(slot.into()), + *pubkey, + ) + .await?; + let selection = downcast::(selection_data.as_ref()) + .ok_or(FetcherError::InvalidSyncCommitteeSelection)?; + + let subcomm_idx = selection.0.subcommittee_index; + + // Check if the validator is an aggregator for the sync committee. + let is_aggregator = + eth2exp::is_sync_comm_aggregator(&self.eth2_cl, selection.0.selection_proof) + .await?; + if !is_aggregator { + tracker.add_not_selected(pubkey.to_string()); + continue; + } + + // Query AggSigDB for DutySyncMessage to get the beacon block root. + let sync_msg_data = self + .agg_sig_db(Duty::new_sync_message_duty(slot.into()), *pubkey) + .await?; + let msg = downcast::(sync_msg_data.as_ref()) + .ok_or(FetcherError::InvalidSyncCommitteeMessage)?; + + let block_root = msg.0.beacon_block_root; + + // Query BN for sync committee contribution. + let contribution = self + .sync_committee_contribution(slot, subcomm_idx, block_root) + .await?; + + tracker.add_resolved(pubkey.to_string()); + + resp.insert( + *pubkey, + UnsignedDutyData::SyncContribution(SyncContribution(contribution)), + ); + } + + tracker.log(); + + Ok(resp) + } + + // --- beacon node helpers ------------------------------------------------- + + /// Queries the beacon node for attestation data. + async fn attestation_data(&self, slot: u64, comm_idx: u64) -> Result { + let request = ProduceAttestationDataRequest::builder() + .slot(slot.to_string()) + .committee_index(comm_idx.to_string()) + .build() + .map_err(EthBeaconNodeApiClientError::RequestError)?; + + match self + .eth2_cl + .produce_attestation_data(request) + .await + .map_err(EthBeaconNodeApiClientError::RequestError)? + { + ProduceAttestationDataResponse::Ok(ok) => round_trip(&ok.data), + _ => Err(FetcherError::NilAttestationData), + } + } + + /// Queries the beacon node for an aggregate attestation by data root. + async fn aggregate_attestation( + &self, + slot: u64, + comm_idx: u64, + data_root: phase0::Root, + ) -> Result { + let request = GetAggregatedAttestationV2Request::builder() + .attestation_data_root(hex_0x(&data_root)) + .slot(slot.to_string()) + .committee_index(comm_idx.to_string()) + .build() + .map_err(EthBeaconNodeApiClientError::RequestError)?; + + let ok = match self + .eth2_cl + .get_aggregated_attestation_v2(request) + .await + .map_err(EthBeaconNodeApiClientError::RequestError)? + { + GetAggregatedAttestationV2Response::Ok(ok) => ok, + // Some beacon nodes return nil if the root is not found; surface a + // retryable error. + _ => return Err(FetcherError::AggregateAttestationNotFound), + }; + + let version = consensus_to_data_version(&ok.version); + Ok(versioned::VersionedAttestation { + version, + validator_index: None, + attestation: Some(attestation_payload(version, &ok.data)?), + }) + } + + /// Queries the beacon node for a sync committee contribution. + async fn sync_committee_contribution( + &self, + slot: u64, + subcomm_idx: u64, + block_root: phase0::Root, + ) -> Result { + let request = ProduceSyncCommitteeContributionRequest::builder() + .slot(slot.to_string()) + .subcommittee_index(subcomm_idx.to_string()) + .beacon_block_root(hex_0x(&block_root)) + .build() + .map_err(EthBeaconNodeApiClientError::RequestError)?; + + match self + .eth2_cl + .produce_sync_committee_contribution(request) + .await + .map_err(EthBeaconNodeApiClientError::RequestError)? + { + ProduceSyncCommitteeContributionResponse::Ok(payload) => round_trip(&payload.data), + _ => Err(FetcherError::SyncContributionNotFound), + } + } + + /// Invokes the registered AggSigDB resolver. + async fn agg_sig_db(&self, duty: Duty, pubkey: PubKey) -> Result> { + let func = self + .agg_sig_db_func + .as_ref() + .ok_or(FetcherError::AggSigDbNotRegistered)?; + func(duty, pubkey).await.map_err(FetcherError::Callback) + } + + /// Invokes the registered DutyDB attestation-data resolver. + async fn await_att_data(&self, slot: u64, comm_idx: u64) -> Result { + let func = self + .await_att_data_func + .as_ref() + .ok_or(FetcherError::AwaitAttDataNotRegistered)?; + func(slot, comm_idx).await.map_err(FetcherError::Callback) + } +} + +/// Builds a closure that wraps a [`FetcherError`] with the duty-type context, +/// matching Go's `errors.Wrap(err, context)`. +fn wrap(context: &'static str) -> impl Fn(FetcherError) -> FetcherError { + move |source| FetcherError::Fetch { + context, + source: Box::new(source), + } +} + +/// Downcasts a `&dyn SignedData` to a concrete signed-data type. +fn downcast(data: &dyn SignedData) -> Option<&T> { + (data as &dyn Any).downcast_ref::() +} + +/// Formats bytes as a `0x`-prefixed lowercase hex string. +fn hex_0x(bytes: &[u8]) -> String { + format!("0x{}", hex::encode(bytes)) +} + +/// Round-trips a loosely-typed beacon node response value into a strongly-typed +/// target via JSON. +fn round_trip(value: &S) -> Result +where + T: serde::de::DeserializeOwned, + S: serde::Serialize, +{ + let value = serde_json::to_value(value)?; + Ok(serde_json::from_value(value)?) +} + +/// Converts a `produce_block_v3` response into an unsigned +/// [`VersionedProposal`]. +fn versioned_proposal_from_response( + resp: &ProduceBlockV3ResponseResponse, +) -> Result { + let data = serde_json::to_value(&resp.data)?; + let blinded = resp.execution_payload_blinded; + + let block = match (&resp.version, blinded) { + (ConsensusVersion::Phase0, _) => ProposalBlock::Phase0(json_from(&data)?), + (ConsensusVersion::Altair, _) => ProposalBlock::Altair(json_from(&data)?), + (ConsensusVersion::Bellatrix, false) => ProposalBlock::Bellatrix(json_from(&data)?), + (ConsensusVersion::Bellatrix, true) => ProposalBlock::BellatrixBlinded(json_from(&data)?), + (ConsensusVersion::Capella, false) => ProposalBlock::Capella(json_from(&data)?), + (ConsensusVersion::Capella, true) => ProposalBlock::CapellaBlinded(json_from(&data)?), + (ConsensusVersion::Deneb, false) => ProposalBlock::Deneb { + block: Box::new(json_from(block_field(&data)?)?), + kzg_proofs: json_from_field(&data, "kzg_proofs")?, + blobs: json_from_field(&data, "blobs")?, + }, + (ConsensusVersion::Deneb, true) => ProposalBlock::DenebBlinded(json_from(&data)?), + (ConsensusVersion::Electra, false) => ProposalBlock::Electra { + block: Box::new(json_from(block_field(&data)?)?), + kzg_proofs: json_from_field(&data, "kzg_proofs")?, + blobs: json_from_field(&data, "blobs")?, + }, + (ConsensusVersion::Electra, true) => ProposalBlock::ElectraBlinded(json_from(&data)?), + (ConsensusVersion::Fulu, false) => ProposalBlock::Fulu { + block: Box::new(json_from(block_field(&data)?)?), + kzg_proofs: json_from_field(&data, "kzg_proofs")?, + blobs: json_from_field(&data, "blobs")?, + }, + (ConsensusVersion::Fulu, true) => ProposalBlock::FuluBlinded(json_from(&data)?), + }; + + Ok(VersionedProposal { block }) +} + +/// Maps a beacon node `ConsensusVersion` onto a `versioned::DataVersion`. +fn consensus_to_data_version(version: &ConsensusVersion) -> versioned::DataVersion { + use versioned::DataVersion as DV; + match version { + ConsensusVersion::Phase0 => DV::Phase0, + ConsensusVersion::Altair => DV::Altair, + ConsensusVersion::Bellatrix => DV::Bellatrix, + ConsensusVersion::Capella => DV::Capella, + ConsensusVersion::Deneb => DV::Deneb, + ConsensusVersion::Electra => DV::Electra, + ConsensusVersion::Fulu => DV::Fulu, + } +} + +/// Builds a versioned attestation payload from a loosely-typed response value. +fn attestation_payload( + version: versioned::DataVersion, + data: &S, +) -> Result { + use versioned::{AttestationPayload as AP, DataVersion as DV}; + Ok(match version { + DV::Phase0 => AP::Phase0(round_trip(data)?), + DV::Altair => AP::Altair(round_trip(data)?), + DV::Bellatrix => AP::Bellatrix(round_trip(data)?), + DV::Capella => AP::Capella(round_trip(data)?), + DV::Deneb => AP::Deneb(round_trip(data)?), + DV::Electra => AP::Electra(round_trip(data)?), + DV::Fulu => AP::Fulu(round_trip(data)?), + DV::Unknown => return Err(FetcherError::AggregateAttestationNotFound), + }) +} + +/// Deserializes a JSON value into `T`. +fn json_from(value: &serde_json::Value) -> Result { + Ok(serde_json::from_value(value.clone())?) +} + +/// Returns the `block` field of a Deneb+ versioned block contents object. +fn block_field(value: &serde_json::Value) -> Result<&serde_json::Value> { + value.get("block").ok_or(FetcherError::MissingBlockField) +} + +/// Deserializes the named field of `value` into `T`, defaulting to `T::default` +/// when absent. +fn json_from_field( + value: &serde_json::Value, + field: &str, +) -> Result { + match value.get(field) { + Some(v) => Ok(serde_json::from_value(v.clone())?), + None => Ok(T::default()), + } +} + +/// Logs a warning when the fee recipient is not correctly populated in the +/// proposal. Fee recipient is unavailable in forks earlier than Bellatrix. +fn verify_fee_recipient(proposal: &VersionedProposal, fee_recipient_address: &str) { + if let Some((expected, actual)) = fee_recipient_mismatch(proposal, fee_recipient_address) { + warn!( + expected = %expected, + actual = %actual, + "Proposal with unexpected fee recipient address" + ); + } +} + +/// Returns `Some((expected, actual))` when the proposal's fee recipient differs +/// (case-insensitively) from `fee_recipient_address`. Returns `None` for forks +/// without a fee recipient (pre-Bellatrix) or when the addresses match. +fn fee_recipient_mismatch( + proposal: &VersionedProposal, + fee_recipient_address: &str, +) -> Option<(String, String)> { + if matches!( + proposal.version(), + versioned::DataVersion::Phase0 | versioned::DataVersion::Altair + ) { + return None; + } + + let value = serde_json::to_value(proposal_body(&proposal.block)).ok()?; + + // Unblinded blocks carry `execution_payload`; blinded blocks carry + // `execution_payload_header`. Both expose `fee_recipient`. + let actual_addr = value + .get("execution_payload") + .or_else(|| value.get("execution_payload_header")) + .and_then(|payload| payload.get("fee_recipient")) + .and_then(|addr| addr.as_str())?; + + if actual_addr.eq_ignore_ascii_case(fee_recipient_address) { + None + } else { + Some((fee_recipient_address.to_string(), actual_addr.to_string())) + } +} + +/// Returns the block body as a JSON value, used by [`verify_fee_recipient`]. +fn proposal_body(block: &ProposalBlock) -> serde_json::Value { + let body = match block { + ProposalBlock::Phase0(b) => serde_json::to_value(&b.body), + ProposalBlock::Altair(b) => serde_json::to_value(&b.body), + ProposalBlock::Bellatrix(b) => serde_json::to_value(&b.body), + ProposalBlock::BellatrixBlinded(b) => serde_json::to_value(&b.body), + ProposalBlock::Capella(b) => serde_json::to_value(&b.body), + ProposalBlock::CapellaBlinded(b) => serde_json::to_value(&b.body), + ProposalBlock::Deneb { block, .. } => serde_json::to_value(&block.body), + ProposalBlock::DenebBlinded(b) => serde_json::to_value(&b.body), + ProposalBlock::Electra { block, .. } => serde_json::to_value(&block.body), + ProposalBlock::ElectraBlinded(b) => serde_json::to_value(&b.body), + ProposalBlock::Fulu { block, .. } => serde_json::to_value(&block.body), + ProposalBlock::FuluBlinded(b) => serde_json::to_value(&b.body), + }; + body.unwrap_or(serde_json::Value::Null) +} + +/// Tracks which pubkeys were selected/resolved for aggregation duties so the +/// outcome can be logged once per fetch. +struct PubkeysTracker { + title: &'static str, + not_selected_pubkeys: Vec, + resolved_pubkeys: Vec, +} + +impl PubkeysTracker { + fn new(title: &'static str) -> Self { + Self { + title, + not_selected_pubkeys: Vec::new(), + resolved_pubkeys: Vec::new(), + } + } + + fn add_not_selected(&mut self, pubkey: String) { + self.not_selected_pubkeys.push(pubkey); + } + + fn add_resolved(&mut self, pubkey: String) { + self.resolved_pubkeys.push(pubkey); + } + + fn log(&self) { + if !self.not_selected_pubkeys.is_empty() { + debug!( + title = self.title, + pubkeys = self.not_selected_pubkeys.join(","), + "not selected pubkeys" + ); + } + + if !self.resolved_pubkeys.is_empty() { + info!( + title = self.title, + pubkeys = self.resolved_pubkeys.join(","), + "resolved pubkeys" + ); + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + + use pluto_testutil::BeaconMock; + + use super::*; + use crate::{ + signeddata::AttesterDuty, + types::{ + AttesterDefinition, DutyDefinition, ProposerDefinition, ProposerDuty, SlotNumber, + SyncCommitteeDefinition, SyncCommitteeDuty, + }, + }; + + /// 48-byte BLS public key length used to build distinct test pubkeys. + const PK_LEN: usize = 48; + + /// Captures the `(duty, set)` passed to the last subscriber invocation. + type Captured = Arc>>; + + /// Builds a subscriber that records its argument into `captured`. + fn capturing_subscriber(captured: Captured) -> Subscriber { + Arc::new(move |duty, set| { + let captured = captured.clone(); + Box::pin(async move { + *captured.lock().unwrap() = Some((duty, set)); + Ok(()) + }) + }) + } + + /// Spec fields required by `is_sync_comm_aggregator` / + /// `is_att_aggregator`, matching the values the prysm selection-proof test + /// vectors were generated against. + fn aggregator_spec() -> serde_json::Value { + serde_json::json!({ + "TARGET_AGGREGATORS_PER_COMMITTEE": "16", + "SYNC_COMMITTEE_SIZE": "512", + "SYNC_COMMITTEE_SUBNET_COUNT": "4", + "TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE": "16", + }) + } + + /// Decodes a 96-byte BLS signature from hex. + fn bls_sig(hex_str: &str) -> phase0::BLSSignature { + hex::decode(hex_str) + .expect("valid hex") + .try_into() + .expect("96-byte signature") + } + + /// Electra block contents (`{block, kzg_proofs, blobs}`) reused as the + /// `produce_block_v3` response payload. + const BLOCK_CONTENTS_GOLDEN: &str = include_str!( + "../../testdata/signeddata/TestJSONSerialisation_VersionedProposal.json.golden" + ); + + /// Mounts a `produce_block_v3` responder that returns the golden Electra + /// block contents with the request's slot, randao reveal and graffiti + /// echoed back and a zero fee recipient. + async fn mount_produce_block(server: &wiremock::MockServer) { + let golden: serde_json::Value = + serde_json::from_str(BLOCK_CONTENTS_GOLDEN).expect("parse golden"); + let base = golden["block"].clone(); + + struct Responder { + base: serde_json::Value, + } + impl wiremock::Respond for Responder { + fn respond(&self, req: &wiremock::Request) -> wiremock::ResponseTemplate { + let query: HashMap = req.url.query_pairs().into_owned().collect(); + let randao = query.get("randao_reveal").cloned().unwrap_or_default(); + let graffiti = query.get("graffiti").cloned().unwrap_or_default(); + // Slot is the final path segment. + let slot = req + .url + .path_segments() + .and_then(|mut s| s.next_back()) + .unwrap_or("0") + .to_string(); + + let mut data = self.base.clone(); + data["block"]["slot"] = serde_json::json!(slot); + data["block"]["body"]["randao_reveal"] = serde_json::json!(randao); + data["block"]["body"]["graffiti"] = serde_json::json!(graffiti); + data["block"]["body"]["execution_payload"]["fee_recipient"] = + serde_json::json!(format!("0x{}", "00".repeat(20))); + + wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "version": "electra", + "execution_payload_blinded": false, + "execution_payload_value": "0", + "consensus_block_value": "0", + "data": data, + })) + } + } + + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path_regex( + r"^/eth/v3/validator/blocks/[0-9]+$", + )) + .respond_with(Responder { base }) + .mount(server) + .await; + } + + #[test] + fn verify_fee_recipient() { + use pluto_eth2api::spec::electra; + + // Electra proposal from the golden block contents. + let golden: serde_json::Value = + serde_json::from_str(BLOCK_CONTENTS_GOLDEN).expect("parse golden"); + let block: electra::BeaconBlock = + serde_json::from_value(golden["block"]["block"].clone()).expect("parse block"); + let proposal = VersionedProposal { + block: ProposalBlock::Electra { + block: Box::new(block), + kzg_proofs: vec![], + blobs: vec![], + }, + }; + + // A different address is reported as a mismatch; the actual address + // matches itself (case-insensitively). + let (_, actual) = + fee_recipient_mismatch(&proposal, "0xdead").expect("mismatch against wrong address"); + assert!(fee_recipient_mismatch(&proposal, &actual).is_none()); + assert!(fee_recipient_mismatch(&proposal, &actual.to_uppercase()).is_none()); + } + + #[tokio::test] + async fn fetch_blocks() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + let randao_a: phase0::BLSSignature = [7u8; 96]; + let randao_b: phase0::BLSSignature = [8u8; 96]; + let randao_by_pubkey: HashMap = + HashMap::from([(pk_a, randao_a), (pk_b, randao_b)]); + + // disable_client_append = true, so graffiti is the raw string padded to + // 32 bytes. + let mut graffiti_a = [0u8; 32]; + graffiti_a[..5].copy_from_slice(b"testA"); + let mut graffiti_b = [0u8; 32]; + graffiti_b[..5].copy_from_slice(b"testB"); + + let mut def_set = DutyDefinitionSet::new(); + def_set.insert( + pk_a, + DutyDefinition::Proposer(ProposerDefinition::new(ProposerDuty { + pubkey: [0u8; 48], + slot: SLOT, + validator_index: 2, + })), + ); + def_set.insert( + pk_b, + DutyDefinition::Proposer(ProposerDefinition::new(ProposerDuty { + pubkey: [0u8; 48], + slot: SLOT, + validator_index: 3, + })), + ); + + let mock = BeaconMock::builder().build().await.expect("build mock"); + mount_produce_block(mock.server()).await; + + let graffiti_builder = GraffitiBuilder::new( + &[pk_a, pk_b], + Some(&["testA".to_string(), "testB".to_string()]), + true, + mock.client(), + ) + .await + .expect("build graffiti"); + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + graffiti_builder, + 5, + false, + ); + + let randaos = randao_by_pubkey.clone(); + fetch.register_agg_sig_db(Arc::new(move |_duty: Duty, pubkey: PubKey| { + let sig = randaos[&pubkey]; + Box::pin(async move { + let data: Box = Box::new(sig); + Ok(data) + }) + })); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + let duty = Duty::new_proposer_duty(SlotNumber::new(SLOT)); + fetch.fetch(duty, def_set).await.expect("fetch"); + + let (_, res_set) = captured.lock().unwrap().take().expect("subscriber called"); + assert_eq!(res_set.len(), 2); + + for (pubkey, expected_randao, expected_graffiti) in + [(pk_a, randao_a, graffiti_a), (pk_b, randao_b, graffiti_b)] + { + let UnsignedDutyData::Proposal(proposal) = res_set.get(&pubkey).expect("entry") else { + panic!("expected proposal"); + }; + assert_eq!(proposal.slot(), SLOT); + + let ProposalBlock::Electra { block, .. } = &proposal.block else { + panic!("expected electra block"); + }; + assert_eq!(block.slot, SLOT); + assert_eq!(block.body.randao_reveal, expected_randao); + assert_eq!(block.body.graffiti, expected_graffiti); + assert_eq!(block.body.execution_payload.fee_recipient, [0u8; 20]); + } + } + + #[tokio::test] + async fn fetch_attester() { + const SLOT: u64 = 1; + const V_IDX_A: u64 = 2; + const V_IDX_B: u64 = 3; + const NOT_ZERO: u64 = 99; // Validation requires non-zero values. + + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + let duty_a = AttesterDuty { + slot: SLOT, + validator_index: V_IDX_A, + committee_index: V_IDX_A, + committee_length: NOT_ZERO, + committees_at_slot: NOT_ZERO, + validator_committee_index: 0, + }; + let duty_b = AttesterDuty { + slot: SLOT, + validator_index: V_IDX_B, + committee_index: V_IDX_B, + committee_length: NOT_ZERO, + committees_at_slot: NOT_ZERO, + validator_committee_index: 0, + }; + + let mut def_set = DutyDefinitionSet::new(); + def_set.insert( + pk_a, + DutyDefinition::Attester(AttesterDefinition::new(duty_a.clone())), + ); + def_set.insert( + pk_b, + DutyDefinition::Attester(AttesterDefinition::new(duty_b.clone())), + ); + + let duty = Duty::new_attester_duty(SlotNumber::new(SLOT)); + let mock = BeaconMock::builder().build().await.expect("build mock"); + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + fetch.fetch(duty.clone(), def_set).await.expect("fetch"); + + let (res_duty, res_set) = captured.lock().unwrap().take().expect("subscriber called"); + assert_eq!(res_duty, duty); + assert_eq!(res_set.len(), 2); + + for (pubkey, expected_duty, v_idx) in [(pk_a, &duty_a, V_IDX_A), (pk_b, &duty_b, V_IDX_B)] { + let UnsignedDutyData::Attestation(att) = res_set.get(&pubkey).expect("entry") else { + panic!("expected attestation data"); + }; + assert_eq!(att.data.slot, SLOT); + assert_eq!(att.data.index, v_idx); + assert_eq!(&att.duty, expected_duty); + } + } + + // Aggregator selection proofs from prysm's + // validate_sync_contribution_proof_test.go. + const SYNC_AGG_SIG_A: &str = "a9dbd88a49a7269e91b8ef1296f1e07f87fed919d51a446b67122bfdfd61d23f3f929fc1cd5209bd6862fd60f739b27213fb0a8d339f7f081fc84281f554b190bb49cc97a6b3364e622af9e7ca96a97fe2b766f9e746dead0b33b58473d91562"; + const SYNC_AGG_SIG_B: &str = "99e60f20dde4d4872b048d703f1943071c20213d504012e7e520c229da87661803b9f139b9a0c5be31de3cef6821c080125aed38ebaf51ba9a2e9d21d7fbf2903577983109d097a8599610a92c0305408d97c1fd4b0b2d1743fb4eedf5443f99"; + const SYNC_NON_AGG_SIG: &str = "b9251a82040d4620b8c5665f328ee6c2eaa02d31d71d153f4abba31a7922a981e541e85283f0ced387d26e86aef9386d18c6982b9b5f8759882fe7f25a328180d86e146994ef19d28bc1432baf29751dec12b5f3d65dbbe224d72cf900c6831a"; + + /// Mounts a request-aware sync-committee-contribution responder that echoes + /// the request slot / subcommittee index / beacon block root. + async fn mount_sync_contribution(server: &wiremock::MockServer) { + struct Responder; + impl wiremock::Respond for Responder { + fn respond(&self, req: &wiremock::Request) -> wiremock::ResponseTemplate { + let query: std::collections::HashMap = + req.url.query_pairs().into_owned().collect(); + let slot = query.get("slot").cloned().unwrap_or_default(); + let subcommittee_index = + query.get("subcommittee_index").cloned().unwrap_or_default(); + let beacon_block_root = query.get("beacon_block_root").cloned().unwrap_or_default(); + + wiremock::ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "data": { + "slot": slot, + "beacon_block_root": beacon_block_root, + "subcommittee_index": subcommittee_index, + "aggregation_bits": format!("0x{}", "00".repeat(16)), + "signature": format!("0x{}", "00".repeat(96)), + } + })) + } + } + + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path( + "/eth/v1/validator/sync_committee_contribution", + )) + .respond_with(Responder) + .mount(server) + .await; + } + + /// Builds a phase0 attestation with the given committee index. + fn build_attestation(index: u64) -> phase0::Attestation { + phase0::Attestation { + aggregation_bits: phase0::BitList::default(), + data: phase0::AttestationData { + slot: 1, + index, + beacon_block_root: [u8::try_from(index).unwrap_or(0); 32], + source: phase0::Checkpoint { + epoch: 0, + root: [0u8; 32], + }, + target: phase0::Checkpoint { + epoch: 0, + root: [0u8; 32], + }, + }, + signature: [0u8; 96], + } + } + + /// Mounts an aggregate-attestation responder that returns the Deneb + /// attestation whose data root matches the request, or 404 when unknown. + async fn mount_aggregate( + server: &wiremock::MockServer, + by_root: HashMap, + ) { + struct Responder { + by_root: HashMap, + } + impl wiremock::Respond for Responder { + fn respond(&self, req: &wiremock::Request) -> wiremock::ResponseTemplate { + let query: HashMap = req.url.query_pairs().into_owned().collect(); + let root = query + .get("attestation_data_root") + .cloned() + .unwrap_or_default(); + match self.by_root.get(&root) { + Some(att) => wiremock::ResponseTemplate::new(200) + .set_body_json(serde_json::json!({ "version": "deneb", "data": att })), + None => wiremock::ResponseTemplate::new(404) + .set_body_json(serde_json::json!({ "code": 404, "message": "not found" })), + } + } + } + + wiremock::Mock::given(wiremock::matchers::method("GET")) + .and(wiremock::matchers::path( + "/eth/v2/validator/aggregate_attestation", + )) + .respond_with(Responder { by_root }) + .mount(server) + .await; + } + + /// Builds an attester definition with the given committee index/length. + fn attester_def(comm_idx: u64, comm_len: u64) -> DutyDefinition { + DutyDefinition::Attester(AttesterDefinition::new(AttesterDuty { + slot: 1, + validator_index: 0, + committee_index: comm_idx, + committee_length: comm_len, + committees_at_slot: 1, + validator_committee_index: 0, + })) + } + + /// Wires AggSigDB to return a beacon committee selection and DutyDB to + /// return the attestation data for each committee index. + fn wire_aggregator(fetch: &mut Fetcher, atts: &[phase0::Attestation]) { + use pluto_eth2api::v1; + + fetch.register_agg_sig_db(Arc::new(move |_duty: Duty, _pubkey: PubKey| { + Box::pin(async move { + let selection = BeaconCommitteeSelection::new(v1::BeaconCommitteeSelection { + slot: 1, + validator_index: 0, + selection_proof: [0u8; 96], + }); + let data: Box = Box::new(selection); + Ok(data) + }) + })); + + let by_idx: HashMap = atts + .iter() + .map(|a| (a.data.index, a.data.clone())) + .collect(); + fetch.register_await_att_data(Arc::new(move |_slot: u64, comm_idx: u64| { + let data = by_idx.get(&comm_idx).cloned(); + Box::pin(async move { data.ok_or_else(|| "missing attestation data".into()) }) + })); + } + + #[tokio::test] + async fn fetch_aggregator_different_committee() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + let att_a = build_attestation(2); + let att_b = build_attestation(3); + + let mut def_set = DutyDefinitionSet::new(); + def_set.insert(pk_a, attester_def(att_a.data.index, 0)); + def_set.insert(pk_b, attester_def(att_b.data.index, 0)); + + let by_root = HashMap::from([ + (hex_0x(&att_a.data.tree_hash_root().0), att_a.clone()), + (hex_0x(&att_b.data.tree_hash_root().0), att_b.clone()), + ]); + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + mount_aggregate(mock.server(), by_root).await; + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + wire_aggregator(&mut fetch, &[att_a.clone(), att_b.clone()]); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + let duty = Duty::new_aggregator_duty(SlotNumber::new(SLOT)); + fetch.fetch(duty, def_set).await.expect("fetch"); + + let (_, res_set) = captured.lock().unwrap().take().expect("subscriber called"); + assert_eq!(res_set.len(), 2); + + for (pubkey, expected_idx) in [(pk_a, 2u64), (pk_b, 3u64)] { + let UnsignedDutyData::AggAttestation(agg) = res_set.get(&pubkey).expect("entry") else { + panic!("expected aggregated attestation"); + }; + assert_eq!(agg.data().expect("data").index, expected_idx); + } + } + + #[tokio::test] + async fn fetch_aggregator_same_committee() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + // Both validators belong to the same committee; the aggregate is fetched + // once and reused for the second validator. + let att = build_attestation(2); + let mut def_set = DutyDefinitionSet::new(); + def_set.insert(pk_a, attester_def(att.data.index, 0)); + def_set.insert(pk_b, attester_def(att.data.index, 0)); + + let by_root = HashMap::from([(hex_0x(&att.data.tree_hash_root().0), att.clone())]); + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + mount_aggregate(mock.server(), by_root).await; + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + wire_aggregator(&mut fetch, std::slice::from_ref(&att)); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + let duty = Duty::new_aggregator_duty(SlotNumber::new(SLOT)); + fetch.fetch(duty, def_set).await.expect("fetch"); + + let (_, res_set) = captured.lock().unwrap().take().expect("subscriber called"); + assert_eq!(res_set.len(), 2); + for pubkey in [pk_a, pk_b] { + let UnsignedDutyData::AggAttestation(agg) = res_set.get(&pubkey).expect("entry") else { + panic!("expected aggregated attestation"); + }; + assert_eq!(agg.data().expect("data").index, 2); + } + } + + #[tokio::test] + async fn fetch_aggregator_no_aggregator() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + + let att_a = build_attestation(2); + let mut def_set = DutyDefinitionSet::new(); + // u64::MAX committee length makes the selection modulo enormous, so the + // validator is never selected as an aggregator. + def_set.insert(pk_a, attester_def(att_a.data.index, u64::MAX)); + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + mount_aggregate(mock.server(), HashMap::new()).await; + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + wire_aggregator(&mut fetch, std::slice::from_ref(&att_a)); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + let duty = Duty::new_aggregator_duty(SlotNumber::new(SLOT)); + // No aggregators found -> empty set -> Ok and subscriber not invoked. + fetch.fetch(duty, def_set).await.expect("fetch"); + assert!(captured.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn fetch_aggregator_nil_aggregate() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + + let att_a = build_attestation(2); + let mut def_set = DutyDefinitionSet::new(); + def_set.insert(pk_a, attester_def(att_a.data.index, 0)); + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + // Empty map -> responder returns 404 for every root. + mount_aggregate(mock.server(), HashMap::new()).await; + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + wire_aggregator(&mut fetch, std::slice::from_ref(&att_a)); + + let duty = Duty::new_aggregator_duty(SlotNumber::new(SLOT)); + let err = fetch + .fetch(duty, def_set) + .await + .expect_err("expected error"); + assert!( + err.to_string() + .contains("aggregate attestation not found by root (retryable)"), + "got: {err}" + ); + } + + #[tokio::test] + async fn fetch_sync_contribution_aggregator() { + use pluto_eth2api::{spec::altair, v1}; + + const SLOT: u64 = 1; + const V_IDX_A: u64 = 2; + const V_IDX_B: u64 = 3; + const SUBCOMM_A: u64 = 4; + const SUBCOMM_B: u64 = 5; + + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + let root_a = [10u8; 32]; + let root_b = [11u8; 32]; + + let selection = |v_idx, subcomm, sig| { + SyncCommitteeSelection::new(v1::SyncCommitteeSelection { + slot: SLOT, + validator_index: v_idx, + subcommittee_index: subcomm, + selection_proof: bls_sig(sig), + }) + }; + let message = |v_idx, root| { + SignedSyncMessage::new(altair::SyncCommitteeMessage { + slot: SLOT, + beacon_block_root: root, + validator_index: v_idx, + signature: [0u8; 96], + }) + }; + + let sel_a = selection(V_IDX_A, SUBCOMM_A, SYNC_AGG_SIG_A); + let sel_b = selection(V_IDX_B, SUBCOMM_B, SYNC_AGG_SIG_B); + let msg_a = message(V_IDX_A, root_a); + let msg_b = message(V_IDX_B, root_b); + + let selections: HashMap = + HashMap::from([(pk_a, sel_a), (pk_b, sel_b)]); + let messages: HashMap = + HashMap::from([(pk_a, msg_a), (pk_b, msg_b)]); + + let mut def_set = DutyDefinitionSet::new(); + for pk in [pk_a, pk_b] { + def_set.insert( + pk, + DutyDefinition::SyncCommittee(SyncCommitteeDefinition::new(SyncCommitteeDuty { + pubkey: [0u8; 48], + validator_index: 0, + validator_sync_committee_indices: vec![], + })), + ); + } + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + mount_sync_contribution(mock.server()).await; + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + + let sels = selections.clone(); + let msgs = messages.clone(); + fetch.register_agg_sig_db(Arc::new(move |duty: Duty, pubkey: PubKey| { + let sels = sels.clone(); + let msgs = msgs.clone(); + Box::pin(async move { + let data: Box = match duty.duty_type { + DutyType::PrepareSyncContribution => Box::new(sels[&pubkey].clone()), + DutyType::SyncMessage => Box::new(msgs[&pubkey].clone()), + _ => return Err("unsupported duty".into()), + }; + Ok(data) + }) + })); + + let captured: Captured = Arc::new(Mutex::new(None)); + fetch.subscribe(capturing_subscriber(captured.clone())); + + let duty = Duty::new_sync_contribution_duty(SlotNumber::new(SLOT)); + fetch.fetch(duty, def_set).await.expect("fetch"); + + let (_, res_set) = captured.lock().unwrap().take().expect("subscriber called"); + assert_eq!(res_set.len(), 2); + + for (pubkey, expected_subcomm, expected_root) in + [(pk_a, SUBCOMM_A, root_a), (pk_b, SUBCOMM_B, root_b)] + { + let UnsignedDutyData::SyncContribution(contrib) = res_set.get(&pubkey).expect("entry") + else { + panic!("expected sync contribution"); + }; + assert_eq!(contrib.0.slot, SLOT); + assert_eq!(contrib.0.subcommittee_index, expected_subcomm); + assert_eq!(contrib.0.beacon_block_root, expected_root); + } + } + + #[tokio::test] + async fn fetch_sync_contribution_not_aggregator() { + use pluto_eth2api::v1; + + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + let pk_b = PubKey::new([3u8; PK_LEN]); + + let mut def_set = DutyDefinitionSet::new(); + for pk in [pk_a, pk_b] { + def_set.insert( + pk, + DutyDefinition::SyncCommittee(SyncCommitteeDefinition::new(SyncCommitteeDuty { + pubkey: [0u8; 48], + validator_index: 0, + validator_sync_committee_indices: vec![], + })), + ); + } + + let mock = BeaconMock::builder() + .spec(aggregator_spec()) + .build() + .await + .expect("build mock"); + + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + + fetch.register_agg_sig_db(Arc::new(move |duty: Duty, _pubkey: PubKey| { + Box::pin(async move { + if duty.duty_type == DutyType::PrepareSyncContribution { + let selection = SyncCommitteeSelection::new(v1::SyncCommitteeSelection { + slot: 0, + validator_index: 0, + subcommittee_index: 0, + selection_proof: bls_sig(SYNC_NON_AGG_SIG), + }); + let data: Box = Box::new(selection); + return Ok(data); + } + Err("unsupported duty".into()) + }) + })); + + let duty = Duty::new_sync_contribution_duty(SlotNumber::new(SLOT)); + // Non-aggregators are skipped, producing an empty set and no error. + fetch.fetch(duty, def_set).await.expect("fetch"); + } + + #[tokio::test] + async fn fetch_sync_contribution_data_error() { + const SLOT: u64 = 1; + let pk_a = PubKey::new([2u8; PK_LEN]); + + let mut def_set = DutyDefinitionSet::new(); + def_set.insert( + pk_a, + DutyDefinition::SyncCommittee(SyncCommitteeDefinition::new(SyncCommitteeDuty { + pubkey: [0u8; 48], + validator_index: 0, + validator_sync_committee_indices: vec![], + })), + ); + + let mock = BeaconMock::builder().build().await.expect("build mock"); + let mut fetch = Fetcher::new( + mock.client().clone(), + None, + true, + GraffitiBuilder::default(), + 5, + false, + ); + + fetch.register_agg_sig_db(Arc::new(move |_duty: Duty, _pubkey: PubKey| { + Box::pin(async move { Err("error".into()) }) + })); + + let duty = Duty::new_sync_contribution_duty(SlotNumber::new(SLOT)); + let err = fetch + .fetch(duty, def_set) + .await + .expect_err("expected error"); + let msg = err.to_string(); + assert!(msg.contains("fetch contribution data"), "got: {msg}"); + assert!(msg.contains("error"), "got: {msg}"); + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 5e69855f..2c7d89cb 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -32,6 +32,9 @@ pub mod dutydb; /// SigAgg — threshold BLS signature aggregation. pub mod sigagg; +/// Fetcher — fetches unsigned duty data from the beacon node. +pub mod fetcher; + mod parsigex_codec; // SSZ codec operates on compile-time-constant byte sizes and offsets. // Arithmetic is bounded and casts from `usize` to `u32` are safe because all diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 2707e5ef..48098b9e 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -8,11 +8,16 @@ use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; +use pluto_eth2api::spec::phase0; + use crate::{ ParSigExCodecError, corepb::v1::core as pbcore, parsigex_codec::{deserialize_signed_data, serialize_signed_data}, - signeddata::SignedDataError, + signeddata::{ + AttestationData, AttesterDuty, SignedDataError, SyncContribution, + VersionedAggregatedAttestation, VersionedProposal, + }, }; /// The type of duty. @@ -426,134 +431,137 @@ impl AsRef<[u8]> for PubKey { // todo: add toEth2Format for the pub key // https://github.com/ObolNetwork/charon/blob/b3008103c5429b031b63518195f4c49db4e9a68d/core/types.go#L311 -/// Duty definition type. +/// A block proposer duty, mirroring eth2 `v1.ProposerDuty`. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct DutyDefinition(T); - -impl DutyDefinition -where - T: Clone + Serialize + StdDebug, -{ - /// Create a new duty definition. - pub fn new(duty_definition: T) -> Self { - Self(duty_definition) - } - - /// Inner value. - pub fn inner(&self) -> &T { - &self.0 - } +pub struct ProposerDuty { + /// Public key of the validator that should propose. + pub pubkey: phase0::BLSPubKey, + /// Slot in which the validator should propose. + pub slot: phase0::Slot, + /// Index of the validator that should propose. + pub validator_index: phase0::ValidatorIndex, } -/// One duty definition per validator, matching Go's `core.DutyDefinitionSet`. -#[derive(Debug, Default, Clone, PartialEq, Eq)] -pub struct DutyDefinitionSet(HashMap>) -where - T: Clone + Serialize + StdDebug; +/// A sync committee duty, mirroring eth2 `v1.SyncCommitteeDuty`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SyncCommitteeDuty { + /// Public key of the validator that should contribute. + pub pubkey: phase0::BLSPubKey, + /// Index of the validator that should contribute. + pub validator_index: phase0::ValidatorIndex, + /// Indices of the validator in the list of validators in the committee. + pub validator_sync_committee_indices: Vec, +} -impl DutyDefinitionSet -where - T: Clone + Serialize + StdDebug, -{ - /// Create a new duty definition set. - pub fn new() -> Self { - Self(HashMap::default()) - } +/// Attester duty definition. Mirrors Go's `core.AttesterDefinition`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AttesterDefinition(pub AttesterDuty); - /// Get a duty definition by public key. - pub fn get(&self, pubkey: &PubKey) -> Option<&DutyDefinition> { - self.0.get(pubkey) +impl AttesterDefinition { + /// Create a new attester definition. + pub fn new(duty: AttesterDuty) -> Self { + Self(duty) } - /// Insert a duty definition. - pub fn insert(&mut self, pubkey: PubKey, duty_definition: DutyDefinition) { - self.0.insert(pubkey, duty_definition); + /// The wrapped attester duty. + pub fn duty(&self) -> &AttesterDuty { + &self.0 } +} - /// Remove a duty definition by public key. - pub fn remove(&mut self, pubkey: &PubKey) -> Option> { - self.0.remove(pubkey) - } +/// Block proposer duty definition. Mirrors Go's `core.ProposerDefinition`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProposerDefinition(pub ProposerDuty); - /// Iterate over all public keys in the set. - pub fn keys(&self) -> impl Iterator { - self.0.keys() +impl ProposerDefinition { + /// Create a new proposer definition. + pub fn new(duty: ProposerDuty) -> Self { + Self(duty) } - /// Inner map. - pub fn inner(&self) -> &HashMap> { + /// The wrapped proposer duty. + pub fn duty(&self) -> &ProposerDuty { &self.0 } - - /// Inner map (mutable). - pub fn inner_mut(&mut self) -> &mut HashMap> { - &mut self.0 - } } -/// Unsigned data type +/// Sync committee duty definition. Mirrors Go's `core.SyncCommitteeDefinition`. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct UnsignedData(T); +pub struct SyncCommitteeDefinition(pub SyncCommitteeDuty); -impl UnsignedData -where - T: Clone + Serialize + StdDebug, -{ - /// Create a new unsigned data. - pub fn new(unsigned_data: T) -> Self { - Self(unsigned_data) +impl SyncCommitteeDefinition { + /// Create a new sync committee definition. + pub fn new(duty: SyncCommitteeDuty) -> Self { + Self(duty) } -} -/// Unsigned data set -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct UnsignedDataSet(HashMap>) -where - T: Clone + Serialize + StdDebug; -impl Default for UnsignedDataSet -where - T: Clone + Serialize + StdDebug, -{ - fn default() -> Self { - Self(HashMap::default()) + /// The wrapped sync committee duty. + pub fn duty(&self) -> &SyncCommitteeDuty { + &self.0 } } -impl UnsignedDataSet -where - T: Clone + Serialize + StdDebug, -{ - /// Create a new unsigned data set. - pub fn new() -> Self { - Self::default() - } - - /// Get an unsigned data by duty type. - pub fn get(&self, duty_type: &DutyType) -> Option<&UnsignedData> { - self.0.get(duty_type) - } +/// Per-validator duty definition. The Rust equivalent of Go's +/// `core.DutyDefinition` interface: a closed set of concrete duty definitions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum DutyDefinition { + /// Attester duty definition. + Attester(AttesterDefinition), + /// Block proposer duty definition. + Proposer(ProposerDefinition), + /// Sync committee duty definition. + SyncCommittee(SyncCommitteeDefinition), +} - /// Insert an unsigned data. - pub fn insert(&mut self, duty_type: DutyType, unsigned_data: UnsignedData) { - self.0.insert(duty_type, unsigned_data); +impl DutyDefinition { + /// Returns the attester definition, or `None` if this is a different duty. + pub fn as_attester(&self) -> Option<&AttesterDefinition> { + match self { + Self::Attester(d) => Some(d), + _ => None, + } } - /// Remove an unsigned data by duty type. - pub fn remove(&mut self, duty_type: &DutyType) -> Option> { - self.0.remove(duty_type) + /// Returns the proposer definition, or `None` if this is a different duty. + pub fn as_proposer(&self) -> Option<&ProposerDefinition> { + match self { + Self::Proposer(d) => Some(d), + _ => None, + } } - /// Inner unsigned data set. - pub fn inner(&self) -> &HashMap> { - &self.0 + /// Returns the sync committee definition, or `None` if this is a different + /// duty. + pub fn as_sync_committee(&self) -> Option<&SyncCommitteeDefinition> { + match self { + Self::SyncCommittee(d) => Some(d), + _ => None, + } } +} - /// Inner unsigned data set. - pub fn inner_mut(&mut self) -> &mut HashMap> { - &mut self.0 - } +/// One duty definition per validator, matching Go's `core.DutyDefinitionSet` +/// (`map[core.PubKey]core.DutyDefinition`). +pub type DutyDefinitionSet = HashMap; + +/// Unsigned duty data variant — the Rust equivalent of Go's +/// `core.UnsignedData` interface. +#[derive(Debug, Clone, PartialEq)] +pub enum UnsignedDutyData { + /// Unsigned proposal (DutyProposer). + Proposal(Box), + /// Unsigned attestation data (DutyAttester). + Attestation(AttestationData), + /// Unsigned aggregated attestation (DutyAggregator). + AggAttestation(VersionedAggregatedAttestation), + /// Unsigned sync contribution (DutySyncContribution). + SyncContribution(SyncContribution), } +/// Map from public key to unsigned duty data, the Rust equivalent of Go's +/// `core.UnsignedDataSet` (`map[core.PubKey]core.UnsignedData`). +pub type UnsignedDataSet = HashMap; + /// Signed data type pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { /// signature returns the signed duty data's signature. @@ -1034,23 +1042,25 @@ mod tests { #[test] fn duty_definition_set() { let pubkey = PubKey::new([1u8; PK_LEN]); + let duty = AttesterDuty { + slot: 1, + validator_index: 2, + committee_index: 3, + committee_length: 4, + committees_at_slot: 5, + validator_committee_index: 6, + }; + let mut set = DutyDefinitionSet::new(); - set.insert(pubkey, DutyDefinition::new(DutyType::Proposer)); - assert_eq!( - set.get(&pubkey), - Some(&DutyDefinition::new(DutyType::Proposer)) + set.insert( + pubkey, + DutyDefinition::Attester(AttesterDefinition::new(duty.clone())), ); - assert_eq!(set.keys().count(), 1); - } - #[test] - fn unsigned_data_set() { - let mut unsigned_data_set = UnsignedDataSet::new(); - unsigned_data_set.insert(DutyType::Proposer, UnsignedData::new(DutyType::Proposer)); - assert_eq!( - unsigned_data_set.get(&DutyType::Proposer), - Some(&UnsignedData::new(DutyType::Proposer)) - ); + let got = set.get(&pubkey).expect("definition present"); + assert_eq!(got.as_attester().map(|d| d.duty()), Some(&duty)); + assert!(got.as_proposer().is_none()); + assert!(got.as_sync_committee().is_none()); } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]