From e3f6b0fbf87c6636bf70798fdab3a7d0c8548301 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 12 Jun 2026 16:50:52 +0700 Subject: [PATCH 1/3] feat(eth2api): add validator_duty --- crates/eth2api/src/extensions.rs | 46 +- crates/eth2api/src/lib.rs | 5 + crates/eth2api/src/spec/version.rs | 35 ++ crates/eth2api/src/validator_duty.rs | 742 +++++++++++++++++++++++++++ 4 files changed, 821 insertions(+), 7 deletions(-) create mode 100644 crates/eth2api/src/validator_duty.rs diff --git a/crates/eth2api/src/extensions.rs b/crates/eth2api/src/extensions.rs index fc08e939..f29480a8 100644 --- a/crates/eth2api/src/extensions.rs +++ b/crates/eth2api/src/extensions.rs @@ -36,6 +36,8 @@ pub enum EthBeaconNodeApiClientError { DomainTypeNotFound(String), } +// Ordered oldest-to-newest. `resolve_fork_version` relies on this order to +// break equal-epoch ties (the latest fork wins), so keep it chronological. const FORKS: [ConsensusVersion; 6] = [ ConsensusVersion::Altair, ConsensusVersion::Bellatrix, @@ -74,7 +76,7 @@ fn parse_u64_field( .map_err(|_| EthBeaconNodeApiClientError::ParseError(format!("parse {field}"))) } -fn decode_fixed_hex String>( +pub(crate) fn decode_fixed_hex String>( value: &str, step: F, ) -> Result<[u8; N], EthBeaconNodeApiClientError> { @@ -186,12 +188,17 @@ pub fn resolve_fork_version( genesis_fork_version: phase0::Version, fork_schedule: &HashMap, ) -> phase0::Version { - fork_schedule - .values() - .filter(|fork| fork.epoch <= epoch) - .max_by_key(|fork| fork.epoch) - .map(|fork| fork.version) - .unwrap_or(genesis_fork_version) + let mut active_version = genesis_fork_version; + for fork in FORKS { + let Some(schedule) = fork_schedule.get(&fork) else { + continue; + }; + if schedule.epoch <= epoch { + active_version = schedule.version; + } + } + + active_version } fn resolve_domain( @@ -411,6 +418,31 @@ mod tests { ); } + #[test] + fn resolve_fork_version_breaks_equal_epoch_ties_by_fork_order() { + let spec = json!({ + "ALTAIR_FORK_VERSION": "0x01020304", + "ALTAIR_FORK_EPOCH": "0", + "BELLATRIX_FORK_VERSION": "0x02030405", + "BELLATRIX_FORK_EPOCH": "0", + "CAPELLA_FORK_VERSION": "0x03040506", + "CAPELLA_FORK_EPOCH": "0", + "DENEB_FORK_VERSION": "0x04050607", + "DENEB_FORK_EPOCH": "0", + "ELECTRA_FORK_VERSION": "0x05060708", + "ELECTRA_FORK_EPOCH": "2048", + "FULU_FORK_VERSION": "0x06070809", + "FULU_FORK_EPOCH": u64::MAX.to_string(), + }); + let fork_schedule = fork_schedule_from_spec(&spec).unwrap(); + let genesis_fork_version = [0x11, 0x22, 0x33, 0x44]; + + assert_eq!( + resolve_fork_version(0, genesis_fork_version, &fork_schedule), + [0x04, 0x05, 0x06, 0x07] + ); + } + #[test] fn compute_builder_domain_stays_constant() { let genesis_fork_version = [0x01, 0x01, 0x70, 0x00]; diff --git a/crates/eth2api/src/lib.rs b/crates/eth2api/src/lib.rs index 4ae1f7c3..ee14e70a 100644 --- a/crates/eth2api/src/lib.rs +++ b/crates/eth2api/src/lib.rs @@ -39,6 +39,11 @@ pub mod versioned; /// Cache of Validators retrieved from the Beacon node. pub mod valcache; +/// Beacon API helpers used by validator duty flows. +pub mod validator_duty; + +pub use validator_duty::*; + #[cfg(test)] pub(crate) mod test_fixtures; diff --git a/crates/eth2api/src/spec/version.rs b/crates/eth2api/src/spec/version.rs index 25b47e6e..d8ed7511 100644 --- a/crates/eth2api/src/spec/version.rs +++ b/crates/eth2api/src/spec/version.rs @@ -78,6 +78,21 @@ impl DataVersion { _ => Err(VersionError::UnknownDataVersion), } } + + /// Maps to the equivalent beacon API [`ConsensusVersion`](crate::ConsensusVersion). + pub const fn to_consensus_version(self) -> Result { + use crate::ConsensusVersion; + match self { + DataVersion::Phase0 => Ok(ConsensusVersion::Phase0), + DataVersion::Altair => Ok(ConsensusVersion::Altair), + DataVersion::Bellatrix => Ok(ConsensusVersion::Bellatrix), + DataVersion::Capella => Ok(ConsensusVersion::Capella), + DataVersion::Deneb => Ok(ConsensusVersion::Deneb), + DataVersion::Electra => Ok(ConsensusVersion::Electra), + DataVersion::Fulu => Ok(ConsensusVersion::Fulu), + DataVersion::Unknown => Err(VersionError::UnknownDataVersion), + } + } } impl fmt::Display for DataVersion { @@ -272,6 +287,26 @@ mod tests { } } + #[test_case(DataVersion::Unknown, None, Some(VersionError::UnknownDataVersion); "unknown")] + #[test_case(DataVersion::Phase0, Some(crate::ConsensusVersion::Phase0), None; "phase0")] + #[test_case(DataVersion::Altair, Some(crate::ConsensusVersion::Altair), None; "altair")] + #[test_case(DataVersion::Bellatrix, Some(crate::ConsensusVersion::Bellatrix), None; "bellatrix")] + #[test_case(DataVersion::Capella, Some(crate::ConsensusVersion::Capella), None; "capella")] + #[test_case(DataVersion::Deneb, Some(crate::ConsensusVersion::Deneb), None; "deneb")] + #[test_case(DataVersion::Electra, Some(crate::ConsensusVersion::Electra), None; "electra")] + #[test_case(DataVersion::Fulu, Some(crate::ConsensusVersion::Fulu), None; "fulu")] + fn data_version_to_consensus_version( + version: DataVersion, + expected: Option, + expected_err: Option, + ) { + match (version.to_consensus_version(), expected, expected_err) { + (Ok(actual), Some(expected), None) => assert_eq!(actual, expected), + (Err(err), None, Some(expected_err)) => assert_eq!(err, expected_err), + _ => panic!("unexpected conversion result"), + } + } + #[test_case(99, None, Some(VersionError::UnknownDataVersion); "unknown")] #[test_case(0, Some(DataVersion::Phase0), None; "phase0")] #[test_case(1, Some(DataVersion::Altair), None; "altair")] diff --git a/crates/eth2api/src/validator_duty.rs b/crates/eth2api/src/validator_duty.rs new file mode 100644 index 00000000..0690981e --- /dev/null +++ b/crates/eth2api/src/validator_duty.rs @@ -0,0 +1,742 @@ +//! Beacon API helpers used by validator duty flows. + +use serde::Serialize; +use serde_json::Value; + +use crate::{ + EthBeaconNodeApiClient, + spec::{altair, phase0}, + versioned, +}; + +type Result = std::result::Result; + +/// Error returned by validator duty beacon API helpers. +#[derive(Debug, thiserror::Error)] +#[error("{0}")] +pub struct ValidatorDutyError(String); + +/// Attester duty data needed by validator duty flows. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AttesterDuty { + /// Duty slot. + pub slot: phase0::Slot, + /// Validator index. + pub validator_index: phase0::ValidatorIndex, + /// Validator public key. + pub pubkey: phase0::BLSPubKey, +} + +impl EthBeaconNodeApiClient { + /// Fetches attester duties for the provided validator indices. + pub async fn fetch_attester_duties_for_indices( + &self, + epoch: phase0::Epoch, + indices: Vec, + ) -> Result> { + let request = crate::GetAttesterDutiesRequest { + path: crate::GetAttesterDutiesRequestPath { + epoch: epoch.to_string(), + }, + body: indices.into_iter().map(|index| index.to_string()).collect(), + }; + + match self + .get_attester_duties(request) + .await + .map_err(error_message)? + { + crate::GetAttesterDutiesResponse::Ok(response) => response + .data + .into_iter() + .map(|duty| { + Ok(AttesterDuty { + slot: parse_u64(&duty.slot, "attester duty slot")?, + validator_index: parse_u64( + &duty.validator_index, + "attester duty validator_index", + )?, + pubkey: crate::extensions::decode_fixed_hex(&duty.pubkey, || { + "decode attester duty pubkey".to_string() + }) + .map_err(error_message)?, + }) + }) + .collect(), + other => Err(unexpected_response( + "get attester duties", + format!("{other:?}"), + )), + } + } + + /// Fetches the beacon attester signing domain. + pub async fn fetch_beacon_attester_domain( + &self, + epoch: phase0::Epoch, + ) -> Result { + let domain_type = self + .fetch_domain_type("DOMAIN_BEACON_ATTESTER") + .await + .map_err(error_message)?; + + self.fetch_domain(domain_type, epoch) + .await + .map_err(error_message) + } + + /// Submits signed attestations to the beacon node. + pub async fn submit_attestations( + &self, + attestations: Vec, + ) -> Result<()> { + let version = attestations + .first() + .map(|attestation| attestation.version) + .unwrap_or(versioned::DataVersion::Phase0) + .to_consensus_version() + .map_err(error_message)?; + let body = attestations_request_body(attestations)?; + let request = crate::SubmitPoolAttestationsV2Request { + header: crate::SubmitPoolAttestationsV2RequestHeader { + eth_consensus_version: version, + }, + body, + }; + + match self + .submit_pool_attestations_v2(request) + .await + .map_err(error_message)? + { + crate::SubmitPoolAttestationsV2Response::Ok => Ok(()), + crate::SubmitPoolAttestationsV2Response::BadRequest(response) => Err(failure_response( + "submit attestations", + response.message, + response.failures, + )), + other => Err(unexpected_response( + "submit attestations", + format!("{other:?}"), + )), + } + } + + /// Submits a signed block proposal to the beacon node. + pub async fn submit_signed_proposal( + &self, + proposal: versioned::VersionedSignedProposal, + ) -> Result<()> { + let version = proposal.version.to_consensus_version().map_err(error_message)?; + let body = proposal_request_body(proposal)?; + let request = crate::PublishBlockV2Request { + query: crate::PublishBlockV2RequestQuery { + broadcast_validation: None, + }, + header: crate::PublishBlockV2RequestHeader { + eth_consensus_version: version, + }, + body, + }; + + match self + .publish_block_v2(request) + .await + .map_err(error_message)? + { + crate::PublishBlockV2Response::Ok | crate::PublishBlockV2Response::Accepted => Ok(()), + other => Err(unexpected_response("submit proposal", format!("{other:?}"))), + } + } + + /// Submits a signed blinded block proposal to the beacon node. + pub async fn submit_signed_blinded_proposal( + &self, + proposal: versioned::VersionedSignedBlindedProposal, + ) -> Result<()> { + let version = proposal.version.to_consensus_version().map_err(error_message)?; + let body = blinded_proposal_request_body(proposal)?; + let request = crate::PublishBlindedBlockV2Request { + query: crate::PublishBlindedBlockV2RequestQuery { + broadcast_validation: None, + }, + header: crate::PublishBlindedBlockV2RequestHeader { + eth_consensus_version: version, + }, + body, + }; + + match self + .publish_blinded_block_v2(request) + .await + .map_err(error_message)? + { + crate::PublishBlockV2Response::Ok | crate::PublishBlockV2Response::Accepted => Ok(()), + other => Err(unexpected_response( + "submit blinded proposal", + format!("{other:?}"), + )), + } + } + + /// Submits signed validator registrations to the beacon node. + pub async fn submit_validator_registrations( + &self, + registrations: Vec, + ) -> Result<()> { + let body = registrations + .into_iter() + .map(registration_request_item) + .collect::>>()?; + let request = crate::RegisterValidatorRequest { body }; + + match self + .register_validator(request) + .await + .map_err(error_message)? + { + crate::RegisterValidatorResponse::Ok => Ok(()), + other => Err(unexpected_response( + "submit validator registrations", + format!("{other:?}"), + )), + } + } + + /// Submits a signed voluntary exit to the beacon node. + pub async fn submit_voluntary_exit(&self, exit: phase0::SignedVoluntaryExit) -> Result<()> { + let request = crate::SubmitPoolVoluntaryExitRequest { + body: voluntary_exit_request_body(exit), + }; + + match self + .submit_pool_voluntary_exit(request) + .await + .map_err(error_message)? + { + crate::SubmitPoolVoluntaryExitResponse::Ok => Ok(()), + other => Err(unexpected_response( + "submit voluntary exit", + format!("{other:?}"), + )), + } + } + + /// Submits signed aggregate-and-proof messages to the beacon node. + pub async fn submit_aggregate_attestations( + &self, + aggregate_and_proofs: Vec, + ) -> Result<()> { + let version = aggregate_and_proofs + .first() + .map(|aggregate| aggregate.version) + .unwrap_or(versioned::DataVersion::Phase0) + .to_consensus_version() + .map_err(error_message)?; + let body = aggregate_and_proofs_request_body(aggregate_and_proofs)?; + let request = crate::PublishAggregateAndProofsV2Request { + header: crate::PublishAggregateAndProofsV2RequestHeader { + eth_consensus_version: version, + }, + body, + }; + + match self + .publish_aggregate_and_proofs_v2(request) + .await + .map_err(error_message)? + { + crate::SubmitPoolAttestationsV2Response::Ok => Ok(()), + crate::SubmitPoolAttestationsV2Response::BadRequest(response) => Err(failure_response( + "submit aggregate attestations", + response.message, + response.failures, + )), + other => Err(unexpected_response( + "submit aggregate attestations", + format!("{other:?}"), + )), + } + } + + /// Submits sync committee messages to the beacon node. + pub async fn submit_sync_committee_messages( + &self, + messages: Vec, + ) -> Result<()> { + let body = messages + .into_iter() + .map(sync_committee_message_request_item) + .collect(); + let request = crate::SubmitPoolSyncCommitteeSignaturesRequest { body }; + + match self + .submit_pool_sync_committee_signatures(request) + .await + .map_err(error_message)? + { + crate::PublishContributionAndProofsResponse::Ok => Ok(()), + other => Err(unexpected_response( + "submit sync committee messages", + format!("{other:?}"), + )), + } + } + + /// Submits sync committee contributions to the beacon node. + pub async fn submit_sync_committee_contributions( + &self, + contributions: Vec, + ) -> Result<()> { + let body = contributions + .into_iter() + .map(sync_contribution_request_item) + .collect(); + let request = crate::PublishContributionAndProofsRequest { body }; + + match self + .publish_contribution_and_proofs(request) + .await + .map_err(error_message)? + { + crate::PublishContributionAndProofsResponse::Ok => Ok(()), + other => Err(unexpected_response( + "submit sync committee contributions", + format!("{other:?}"), + )), + } + } +} + +/// Returns true for data versions that use pre-Electra attestation wire shape. +pub fn data_version_is_before_electra(version: versioned::DataVersion) -> bool { + matches!( + version, + versioned::DataVersion::Unknown + | versioned::DataVersion::Phase0 + | versioned::DataVersion::Altair + | versioned::DataVersion::Bellatrix + | versioned::DataVersion::Capella + | versioned::DataVersion::Deneb + ) +} + +fn attestations_request_body( + attestations: Vec, +) -> Result { + if attestations + .first() + .is_some_and(|attestation| !data_version_is_before_electra(attestation.version)) + { + let mut items = Vec::with_capacity(attestations.len()); + for attestation in attestations { + let Some(payload) = attestation.attestation else { + return Err(unexpected_response( + "attestation request body", + "missing payload", + )); + }; + let validator_index = attestation.validator_index.ok_or_else(|| { + unexpected_response("attestation request body", "missing validator index") + })?; + + match payload { + versioned::AttestationPayload::Electra(attestation) + | versioned::AttestationPayload::Fulu(attestation) => { + items.push(crate::AttestationRequestBody2Array { + attester_index: validator_index.to_string(), + committee_index: first_set_bit(&attestation.committee_bits.bytes) + .ok_or_else(|| { + unexpected_response( + "attestation request body", + "missing committee index", + ) + })? + .to_string(), + data: data_request_body(&attestation.data)?, + signature: hex0x(attestation.signature), + }); + } + _ => { + return Err(unexpected_response( + "attestation request body", + "pre-electra payload in electra request", + )); + } + } + } + + return Ok(crate::AttestationRequestBody2::Array(items)); + } + + let mut items = Vec::with_capacity(attestations.len()); + for attestation in attestations { + let Some(payload) = attestation.attestation else { + return Err(unexpected_response( + "attestation request body", + "missing payload", + )); + }; + let value = match payload { + versioned::AttestationPayload::Phase0(attestation) + | versioned::AttestationPayload::Altair(attestation) + | versioned::AttestationPayload::Bellatrix(attestation) + | versioned::AttestationPayload::Capella(attestation) + | versioned::AttestationPayload::Deneb(attestation) => { + serde_json::to_value(attestation).map_err(error_message)? + } + versioned::AttestationPayload::Electra(_) | versioned::AttestationPayload::Fulu(_) => { + return Err(unexpected_response( + "attestation request body", + "electra payload in pre-electra request", + )); + } + }; + items.push(serde_json::from_value(value).map_err(error_message)?); + } + + Ok(crate::AttestationRequestBody2::Array2(items)) +} + +fn proposal_request_body( + proposal: versioned::VersionedSignedProposal, +) -> Result { + match proposal.block { + versioned::SignedProposalBlock::Phase0(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::BlockRequestBody::Object7( + crate::GetBlindedBlockResponseResponseDataObject6 { message, signature }, + )) + } + versioned::SignedProposalBlock::Altair(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::BlockRequestBody::Object6( + crate::GetBlindedBlockResponseResponseDataObject5 { message, signature }, + )) + } + versioned::SignedProposalBlock::Bellatrix(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::BlockRequestBody::Object5( + crate::BlockRequestBodyObject5 { message, signature }, + )) + } + versioned::SignedProposalBlock::Capella(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::BlockRequestBody::Object4( + crate::BlockRequestBodyObject4 { message, signature }, + )) + } + versioned::SignedProposalBlock::Deneb(contents) => { + let (message, signature) = signed_envelope(contents.signed_block)?; + Ok(crate::BlockRequestBody::Object3( + crate::BlockRequestBodyObject3 { + blobs: hex_values(contents.blobs)?, + kzg_proofs: hex_values(contents.kzg_proofs)?, + signed_block: crate::DenebSignedBlockContentsSignedBlock { message, signature }, + }, + )) + } + versioned::SignedProposalBlock::Electra(contents) => { + let (message, signature) = signed_envelope(contents.signed_block)?; + Ok(crate::BlockRequestBody::Object2( + crate::BlockRequestBodyObject2 { + blobs: hex_values(contents.blobs)?, + kzg_proofs: hex_values(contents.kzg_proofs)?, + signed_block: crate::SignedBlockContentsSignedBlock { message, signature }, + }, + )) + } + versioned::SignedProposalBlock::Fulu(contents) => { + let (message, signature) = signed_envelope(contents.signed_block)?; + Ok(crate::BlockRequestBody::Object( + crate::BlockRequestBodyObject { + blobs: hex_values(contents.blobs)?, + kzg_proofs: hex_values(contents.kzg_proofs)?, + signed_block: crate::SignedBlockContentsSignedBlock { message, signature }, + }, + )) + } + versioned::SignedProposalBlock::BellatrixBlinded(_) + | versioned::SignedProposalBlock::CapellaBlinded(_) + | versioned::SignedProposalBlock::DenebBlinded(_) + | versioned::SignedProposalBlock::ElectraBlinded(_) + | versioned::SignedProposalBlock::FuluBlinded(_) => Err(unexpected_response( + "proposal request body", + "blinded proposal on unblinded endpoint", + )), + } +} + +fn blinded_proposal_request_body( + proposal: versioned::VersionedSignedBlindedProposal, +) -> Result { + match proposal.block { + versioned::SignedBlindedProposalBlock::Bellatrix(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::GetBlindedBlockResponseResponseData::Object4( + crate::GetBlindedBlockResponseResponseDataObject4 { message, signature }, + )) + } + versioned::SignedBlindedProposalBlock::Capella(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::GetBlindedBlockResponseResponseData::Object3( + crate::GetBlindedBlockResponseResponseDataObject3 { message, signature }, + )) + } + versioned::SignedBlindedProposalBlock::Deneb(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::GetBlindedBlockResponseResponseData::Object2( + crate::GetBlindedBlockResponseResponseDataObject2 { message, signature }, + )) + } + versioned::SignedBlindedProposalBlock::Electra(block) + | versioned::SignedBlindedProposalBlock::Fulu(block) => { + let (message, signature) = signed_envelope(block)?; + Ok(crate::GetBlindedBlockResponseResponseData::Object( + crate::GetBlindedBlockResponseResponseDataObject { message, signature }, + )) + } + } +} + +fn registration_request_item( + registration: versioned::VersionedSignedValidatorRegistration, +) -> Result { + match (registration.version, registration.v1) { + (versioned::BuilderVersion::V1, Some(registration)) => { + Ok(crate::RegisterValidatorRequestBodyItem { + message: crate::SignedValidatorRegistrationMessage { + fee_recipient: hex0x(registration.message.fee_recipient), + gas_limit: registration.message.gas_limit.to_string(), + pubkey: hex0x(registration.message.pubkey), + timestamp: registration.message.timestamp.to_string(), + }, + signature: hex0x(registration.signature), + }) + } + _ => Err(unexpected_response( + "validator registration request body", + "unsupported builder registration version", + )), + } +} + +fn voluntary_exit_request_body( + exit: phase0::SignedVoluntaryExit, +) -> crate::GetPoolVoluntaryExitsResponseResponseDatum { + crate::GetPoolVoluntaryExitsResponseResponseDatum { + message: crate::Phase0SignedVoluntaryExitMessage { + epoch: exit.message.epoch.to_string(), + validator_index: exit.message.validator_index.to_string(), + }, + signature: hex0x(exit.signature), + } +} + +fn aggregate_and_proofs_request_body( + aggregate_and_proofs: Vec, +) -> Result { + let is_electra = aggregate_and_proofs + .first() + .is_some_and(|aggregate| !data_version_is_before_electra(aggregate.version)); + + let envelopes = aggregate_and_proofs + .into_iter() + .map(signed_aggregate_envelope) + .collect::>>()?; + + Ok(if is_electra { + crate::AggregateAndProofRequestBody::Array( + envelopes + .into_iter() + .map(|(message, signature)| crate::AggregateAndProofRequestBodyArray { + message, + signature, + }) + .collect(), + ) + } else { + crate::AggregateAndProofRequestBody::Array2( + envelopes + .into_iter() + .map(|(message, signature)| crate::AggregateAndProofRequestBodyArray2 { + message, + signature, + }) + .collect(), + ) + }) +} + +fn signed_aggregate_envelope( + aggregate: versioned::VersionedSignedAggregateAndProof, +) -> Result<(Value, String)> { + match aggregate.aggregate_and_proof { + versioned::SignedAggregateAndProofPayload::Phase0(payload) + | versioned::SignedAggregateAndProofPayload::Altair(payload) + | versioned::SignedAggregateAndProofPayload::Bellatrix(payload) + | versioned::SignedAggregateAndProofPayload::Capella(payload) + | versioned::SignedAggregateAndProofPayload::Deneb(payload) => signed_envelope(payload), + versioned::SignedAggregateAndProofPayload::Electra(payload) + | versioned::SignedAggregateAndProofPayload::Fulu(payload) => signed_envelope(payload), + } +} + +fn sync_committee_message_request_item( + message: altair::SyncCommitteeMessage, +) -> crate::SyncCommitteeRequestBodyItem { + crate::SyncCommitteeRequestBodyItem { + beacon_block_root: hex0x(message.beacon_block_root), + signature: hex0x(message.signature), + slot: message.slot.to_string(), + validator_index: message.validator_index.to_string(), + } +} + +fn sync_contribution_request_item( + contribution: altair::SignedContributionAndProof, +) -> crate::ContributionAndProofRequestBodyItem { + let message = contribution.message; + let contribution_data = message.contribution; + + crate::ContributionAndProofRequestBodyItem { + message: crate::AltairSignedContributionAndProofMessage { + aggregator_index: message.aggregator_index.to_string(), + contribution: crate::Contribution { + aggregation_bits: hex0x(contribution_data.aggregation_bits.bytes), + beacon_block_root: hex0x(contribution_data.beacon_block_root), + signature: hex0x(contribution_data.signature), + slot: contribution_data.slot.to_string(), + subcommittee_index: contribution_data.subcommittee_index.to_string(), + }, + selection_proof: hex0x(message.selection_proof), + }, + signature: hex0x(contribution.signature), + } +} + +fn data_request_body(data: &phase0::AttestationData) -> Result { + serde_json::from_value(serde_json::to_value(data).map_err(error_message)?) + .map_err(error_message) +} + +fn signed_envelope(value: T) -> Result<(Value, String)> { + let mut value = serde_json::to_value(value).map_err(error_message)?; + let object = value.as_object_mut().ok_or_else(|| { + unexpected_response("signed envelope", "serialized signed data is not an object") + })?; + let message = object + .remove("message") + .ok_or_else(|| unexpected_response("signed envelope", "missing message"))?; + let signature = object + .remove("signature") + .and_then(|value| value.as_str().map(str::to_owned)) + .ok_or_else(|| unexpected_response("signed envelope", "missing signature"))?; + + Ok((message, signature)) +} + +fn hex_values(values: Vec) -> Result> { + values + .into_iter() + .map(|value| { + serde_json::to_value(value) + .map_err(error_message)? + .as_str() + .map(str::to_owned) + .ok_or_else(|| unexpected_response("hex value", "serialized value is not hex")) + }) + .collect() +} + +fn first_set_bit(bytes: &[u8]) -> Option { + bytes.iter().enumerate().find_map(|(byte_index, byte)| { + if *byte == 0 { + None + } else { + byte_index.checked_mul(8).and_then(|offset| { + usize::try_from(byte.trailing_zeros()) + .ok() + .and_then(|bit| offset.checked_add(bit)) + }) + } + }) +} + +fn parse_u64(value: &str, field: &'static str) -> Result { + value + .parse() + .map_err(|_| unexpected_response(field, format!("invalid u64 {value}"))) +} + +fn hex0x(bytes: impl AsRef<[u8]>) -> String { + pluto_ssz::to_0x_hex(bytes.as_ref()) +} + +fn error_message(source: impl ToString) -> ValidatorDutyError { + ValidatorDutyError(source.to_string()) +} + +fn unexpected_response(context: &'static str, response: impl Into) -> ValidatorDutyError { + ValidatorDutyError(format!("{context}: {}", response.into())) +} + +fn failure_response( + context: &'static str, + message: String, + failures: Vec, +) -> ValidatorDutyError { + let details = failures + .into_iter() + .map(|failure| failure.message) + .collect::>() + .join("; "); + if details.is_empty() { + unexpected_response(context, message) + } else { + unexpected_response(context, format!("{message}: {details}")) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{electra, phase0}; + + fn attestation_data(slot: u64, epoch: u64) -> phase0::AttestationData { + phase0::AttestationData { + slot, + index: 3, + beacon_block_root: [1; 32], + source: phase0::Checkpoint { + epoch: epoch.saturating_sub(1), + root: [2; 32], + }, + target: phase0::Checkpoint { + epoch, + root: [3; 32], + }, + } + } + + #[test] + fn electra_attestation_request_body_requires_validator_and_committee_index() { + let body = attestations_request_body(vec![versioned::VersionedAttestation { + version: versioned::DataVersion::Electra, + validator_index: Some(99), + attestation: Some(versioned::AttestationPayload::Electra( + electra::Attestation { + aggregation_bits: phase0::BitList::with_bits(8, &[0]), + data: attestation_data(12, 3), + signature: [4; 96], + committee_bits: pluto_ssz::BitVector::with_bits(&[3]), + }, + )), + }]) + .expect("body"); + let value = serde_json::to_value(body).expect("json body"); + + assert_eq!(value[0]["attester_index"], "99"); + assert_eq!(value[0]["committee_index"], "3"); + } +} From bc30a1294b6bade1f4b46072e10cbc6524a3ae09 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 12 Jun 2026 18:20:31 +0700 Subject: [PATCH 2/3] feat(core): implement core/bcast --- Cargo.lock | 1 + crates/core/Cargo.toml | 1 + crates/core/src/bcast/metrics.rs | 76 ++ crates/core/src/bcast/mod.rs | 1540 ++++++++++++++++++++++++++++++ crates/core/src/bcast/recast.rs | 152 +++ crates/core/src/lib.rs | 3 + 6 files changed, 1773 insertions(+) create mode 100644 crates/core/src/bcast/metrics.rs create mode 100644 crates/core/src/bcast/mod.rs create mode 100644 crates/core/src/bcast/recast.rs diff --git a/Cargo.lock b/Cargo.lock index fb88bd1a..5068183b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5635,6 +5635,7 @@ dependencies = [ "dyn-clone", "dyn-eq", "ethereum_ssz", + "futures", "hex", "pluto-build-proto", "pluto-cluster", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 741f1482..7c927478 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -15,6 +15,7 @@ chrono.workspace = true crossbeam.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true +futures.workspace = true hex.workspace = true vise.workspace = true pluto-crypto.workspace = true diff --git a/crates/core/src/bcast/metrics.rs b/crates/core/src/bcast/metrics.rs new file mode 100644 index 00000000..81cc309b --- /dev/null +++ b/crates/core/src/bcast/metrics.rs @@ -0,0 +1,76 @@ +use chrono::Duration; +use vise::*; + +use crate::types::{Duty, DutyType, PubKey}; + +const BROADCAST_DELAY_BUCKETS: [f64; 11] = + [0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 20.0, 30.0, 60.0]; + +const SOURCE_PREGEN: &str = "pregen"; +const SOURCE_DOWNSTREAM: &str = "downstream"; + +/// Metrics for the core broadcaster. +#[derive(Debug, Metrics)] +#[metrics(prefix = "core_bcast")] +pub struct BcastMetrics { + /// Successfully broadcast duties by type. + #[metrics(labels = ["duty"])] + pub broadcast_total: LabeledFamily, + + /// Duty broadcast delay since expected duty submission, by type. + #[metrics(buckets = &BROADCAST_DELAY_BUCKETS, labels = ["duty"])] + pub broadcast_delay_seconds: LabeledFamily, + + /// Unique validator registrations stored in the recaster, by pubkey. + #[metrics(labels = ["pubkey"])] + pub recast_registration_total: LabeledFamily, + + /// Recast registrations by source. + #[metrics(labels = ["source"])] + pub recast_total: LabeledFamily, + + /// Failed recast registrations by source. + #[metrics(labels = ["source"])] + pub recast_errors_total: LabeledFamily, +} + +#[vise::register] +pub static BCAST_METRICS: Global = Global::new(); + +pub(crate) fn instrument_duty(duty: &Duty, delay: Duration) { + let duty_type = duty.duty_type.to_string(); + BCAST_METRICS.broadcast_total[&duty_type].inc(); + + let Some(delay) = delay.to_std().ok() else { + return; + }; + BCAST_METRICS.broadcast_delay_seconds[&duty_type].observe(delay.as_secs_f64()); +} + +pub(crate) fn instrument_recast_registration(pubkey: PubKey) { + BCAST_METRICS.recast_registration_total[&pubkey.to_string()].inc(); +} + +pub(crate) fn instrument_recast(duty: &Duty) { + if duty.duty_type != DutyType::BuilderRegistration { + return; + } + + BCAST_METRICS.recast_total[&source(duty)].inc(); +} + +pub(crate) fn instrument_recast_error(duty: &Duty) { + if duty.duty_type != DutyType::BuilderRegistration { + return; + } + + BCAST_METRICS.recast_errors_total[&source(duty)].inc(); +} + +fn source(duty: &Duty) -> String { + if duty.slot.inner() > 0 { + SOURCE_DOWNSTREAM.to_string() + } else { + SOURCE_PREGEN.to_string() + } +} diff --git a/crates/core/src/bcast/mod.rs b/crates/core/src/bcast/mod.rs new file mode 100644 index 00000000..66961ce3 --- /dev/null +++ b/crates/core/src/bcast/mod.rs @@ -0,0 +1,1540 @@ +//! Broadcasts aggregated signed duty data to the beacon node. + +mod metrics; +mod recast; + +use std::{any::Any, error::Error as StdError, time::Duration as StdDuration}; + +use chrono::{DateTime, Duration, Utc}; +use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; +use pluto_eth2api::{ + AttesterDuty, BeaconNodeClient, EthBeaconNodeApiClient, GetStateValidatorsResponseResponseDatum, + ValidatorStatus, data_version_is_before_electra, + spec::{altair, phase0}, + versioned, +}; +use tree_hash::TreeHash; + +pub use recast::Recaster; + +use crate::{ + bcast::metrics::instrument_duty, + signeddata::{ + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + VersionedAttestation, VersionedSignedAggregateAndProof, VersionedSignedProposal, + VersionedSignedValidatorRegistration, + }, + types::{Duty, DutyType, PubKey, SignedData, SignedDataSet}, +}; + +/// Boxed client/provider error. +pub type BoxError = Box; + +/// Broadcaster result. +pub type Result = std::result::Result; + +/// Broadcaster error. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Beacon client/provider error. + #[error("{context}: {source}")] + Client { + /// Operation context. + context: &'static str, + /// Underlying error. + #[source] + source: BoxError, + }, + + /// Signed-data conversion error. + #[error("{context}: {source}")] + SignedData { + /// Operation context. + context: &'static str, + /// Underlying error. + #[source] + source: crate::signeddata::SignedDataError, + }, + + /// Crypto operation error. + #[error("{context}: {source}")] + Crypto { + /// Operation context. + context: &'static str, + /// Underlying error. + #[source] + source: pluto_crypto::types::Error, + }, + + /// Invalid time value. + #[error("{context}: invalid time value {value}")] + InvalidTime { + /// Operation context. + context: &'static str, + /// Invalid value. + value: i64, + }, + + /// Arithmetic overflow. + #[error("{context}: arithmetic overflow")] + ArithmeticOverflow { + /// Operation context. + context: &'static str, + }, + + /// Mutex poisoned. + #[error("{0}: mutex poisoned")] + MutexPoisoned(&'static str), + + /// `DutyBuilderProposer` is deprecated. + #[error("deprecated duty DutyBuilderProposer")] + DeprecatedDutyBuilderProposer, + + /// Expected one item in set. + #[error("expected one item in set")] + ExpectedOneItemInSet, + + /// Invalid proposal data. + #[error("invalid proposal")] + InvalidProposal, + + /// Invalid registration data. + #[error("invalid registration")] + InvalidRegistration, + + /// Invalid exit data. + #[error("invalid exit")] + InvalidExit, + + /// Invalid aggregate-and-proof data. + #[error("invalid aggregate and proof")] + InvalidAggregateAndProof, + + /// Invalid sync committee message. + #[error("invalid sync committee message")] + InvalidSyncCommitteeMessage, + + /// Invalid sync committee contribution. + #[error("invalid sync committee contribution")] + InvalidSyncCommitteeContribution, + + /// Invalid attestation data. + #[error("invalid attestation")] + InvalidAttestation, + + /// No attestations available. + #[error("no attestations")] + NoAttestations, + + /// Validator field could not be parsed. + #[error("{context}: invalid validator field")] + InvalidValidatorField { + /// Operation context. + context: &'static str, + }, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, +} + +/// Complete validator data needed for Electra attestation repair. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CompleteValidator { + /// Validator status. + pub status: ValidatorStatus, + /// Activation epoch. + pub activation_epoch: phase0::Epoch, +} + +impl TryFrom<&GetStateValidatorsResponseResponseDatum> for CompleteValidator { + type Error = std::num::ParseIntError; + + fn try_from( + datum: &GetStateValidatorsResponseResponseDatum, + ) -> std::result::Result { + Ok(Self { + status: datum.status.clone(), + activation_epoch: datum.validator.activation_epoch.parse()?, + }) + } +} + +#[derive(Debug, Clone)] +struct DelayCalculator { + genesis_time: DateTime, + slot_duration: Duration, +} + +impl DelayCalculator { + fn delay(&self, slot: u64, duty_type: &DutyType) -> Result { + let slot_duration = duration_to_std(self.slot_duration, "slot duration")?; + let slot_count = + u32::try_from(slot).map_err(|_| Error::ArithmeticOverflow { context: "slot" })?; + let elapsed = slot_duration + .checked_mul(slot_count) + .ok_or(Error::ArithmeticOverflow { + context: "slot elapsed", + })?; + let slot_start = self + .genesis_time + .checked_add_signed(duration_from_std(elapsed, "slot elapsed")?) + .ok_or(Error::ArithmeticOverflow { + context: "slot start", + })?; + + let expected_submission = if duty_type == &DutyType::Attester { + slot_start + .checked_add_signed(div_duration(self.slot_duration, 3, "attester delay")?) + .ok_or(Error::ArithmeticOverflow { + context: "attester delay", + })? + } else if matches!(duty_type, DutyType::Aggregator | DutyType::SyncContribution) { + let third = div_duration(self.slot_duration, 3, "aggregation delay")?; + slot_start + .checked_add_signed(mul_duration(third, 2, "aggregation delay")?) + .ok_or(Error::ArithmeticOverflow { + context: "aggregation delay", + })? + } else { + slot_start + }; + + Ok(Utc::now().signed_duration_since(expected_submission)) + } +} + +/// Broadcasts aggregated signed duty data to the beacon node. +pub struct Broadcaster { + client: BeaconNodeClient, + delay_calculator: DelayCalculator, +} + +impl Broadcaster { + /// Creates a new broadcaster. + pub async fn new(client: BeaconNodeClient) -> Result { + let genesis_time = + client + .api() + .fetch_genesis_time() + .await + .map_err(|source| Error::Client { + context: "fetch genesis time", + source: boxed(source), + })?; + let (slot_duration, _) = + client + .api() + .fetch_slots_config() + .await + .map_err(|source| Error::Client { + context: "fetch slots config", + source: boxed(source), + })?; + let slot_duration = + Duration::from_std(slot_duration).map_err(|_| Error::ArithmeticOverflow { + context: "slot duration", + })?; + + Ok(Self { + client, + delay_calculator: DelayCalculator { + genesis_time, + slot_duration, + }, + }) + } + + /// Broadcasts aggregated signed duty data to the beacon node. + pub async fn broadcast(&self, mut duty: Duty, set: SignedDataSet) -> Result<()> { + match duty.duty_type { + DutyType::Attester => self.broadcast_attester(&duty, &set).await?, + DutyType::Proposer => self.broadcast_proposer(&duty, &set).await?, + DutyType::BuilderProposer => return Err(Error::DeprecatedDutyBuilderProposer), + DutyType::BuilderRegistration => { + duty.slot = first_slot_in_current_epoch(self.client.api()).await?; + self.broadcast_builder_registration(&duty, &set).await?; + } + DutyType::Exit => self.broadcast_exits(&duty, &set).await?, + DutyType::Randao | DutyType::PrepareAggregator | DutyType::PrepareSyncContribution => {} + DutyType::Aggregator => self.broadcast_aggregator(&duty, &set).await?, + DutyType::SyncMessage => self.broadcast_sync_messages(&duty, &set).await?, + DutyType::SyncContribution => self.broadcast_sync_contributions(&duty, &set).await?, + DutyType::Unknown + | DutyType::Signature + | DutyType::InfoSync + | DutyType::DutySentinel(_) => { + return Err(Error::UnsupportedDutyType); + } + } + + if let Ok(delay) = self + .delay_calculator + .delay(duty.slot.inner(), &duty.duty_type) + { + instrument_duty(&duty, delay); + } + + Ok(()) + } + + async fn broadcast_attester(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let mut attestations = set_to_attestations(set)?; + + if attestations_need_validator_indices(&attestations) { + tracing::warn!( + error = "peer version causes slowdown", + "There is a charon node in the cluster at one of the following versions: v1.3.0, v1.3.1, v1.4.0 or v1.4.1. Please update, as it causes performance degradation." + ); + + if attestations.is_empty() { + return Err(Error::NoAttestations); + } + + self.populate_missing_validator_indices(&mut attestations) + .await?; + } + + match self.client.api().submit_attestations(attestations).await { + Ok(()) => Ok(()), + Err(source) if source.to_string().contains("PriorAttestationKnown") => Ok(()), + Err(source) => Err(Error::Client { + context: "submit attestations", + source: boxed(source), + }), + }?; + + tracing::info!(%duty, "Successfully submitted v2 attestations to beacon node"); + Ok(()) + } + + async fn broadcast_proposer(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let (pubkey, agg_data) = set_to_one(set)?; + let block = + downcast_signed_data::(agg_data, Error::InvalidProposal)?; + let blinded = block.0.blinded; + + if blinded { + let proposal = block.to_blinded().map_err(|source| Error::SignedData { + context: "cannot broadcast, expected blinded proposal", + source, + })?; + self.client + .api() + .submit_signed_blinded_proposal(proposal) + .await + .map_err(|source| Error::Client { + context: "submit blinded proposal", + source: boxed(source), + })?; + } else { + self.client + .api() + .submit_signed_proposal(block.0) + .await + .map_err(|source| Error::Client { + context: "submit proposal", + source: boxed(source), + })?; + } + + tracing::info!(%duty, %pubkey, blinded, "Successfully submitted block proposal to beacon node"); + Ok(()) + } + + async fn broadcast_builder_registration(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let registrations = set_to_registrations(set)?; + self.client + .api() + .submit_validator_registrations(registrations) + .await + .map_err(|source| Error::Client { + context: "submit validator registrations", + source: boxed(source), + })?; + + tracing::info!(%duty, "Successfully submitted validator registrations to beacon node"); + Ok(()) + } + + async fn broadcast_exits(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let mut last_error = None; + for (pubkey, exit) in set_to_exits(set)? { + match self.client.api().submit_voluntary_exit(exit).await { + Ok(()) => { + tracing::info!(%duty, %pubkey, "Successfully submitted voluntary exit to beacon node") + } + Err(source) => last_error = Some(source), + } + } + + if let Some(source) = last_error { + return Err(Error::Client { + context: "submit voluntary exit", + source: boxed(source), + }); + } + + Ok(()) + } + + async fn broadcast_aggregator(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let aggregate_and_proofs = set_to_agg_and_proof(set)?; + self.client + .api() + .submit_aggregate_attestations(aggregate_and_proofs) + .await + .map_err(|source| Error::Client { + context: "submit aggregate attestations", + source: boxed(source), + })?; + + tracing::info!(%duty, "Successfully submitted v2 attestation aggregations to beacon node"); + Ok(()) + } + + async fn broadcast_sync_messages(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let messages = set_to_sync_messages(set)?; + self.client + .api() + .submit_sync_committee_messages(messages) + .await + .map_err(|source| Error::Client { + context: "submit sync committee messages", + source: boxed(source), + })?; + + tracing::info!(%duty, "Successfully submitted sync committee messages to beacon node"); + Ok(()) + } + + async fn broadcast_sync_contributions(&self, duty: &Duty, set: &SignedDataSet) -> Result<()> { + let contributions = set_to_sync_contributions(set)?; + self.client + .api() + .submit_sync_committee_contributions(contributions) + .await + .map_err(|source| Error::Client { + context: "submit sync committee contributions", + source: boxed(source), + })?; + + tracing::info!(%duty, "Successfully submitted sync committee contributions to beacon node"); + Ok(()) + } + + async fn populate_missing_validator_indices( + &self, + attestations: &mut [versioned::VersionedAttestation], + ) -> Result<()> { + let att0_data = attestations + .first() + .and_then(|attestation| attestation.attestation.as_ref()) + .map(versioned::AttestationPayload::data) + .ok_or(Error::InvalidAttestation)?; + let epoch = att0_data.target.epoch; + let slot = att0_data.slot; + + let val_idxs = resolve_active_validators_indices(&self.client, epoch).await?; + let duties = self + .client + .api() + .fetch_attester_duties_for_indices(epoch, val_idxs) + .await + .map_err(|source| Error::Client { + context: "fetch attester duties", + source: boxed(source), + })?; + let domain = self + .client + .api() + .fetch_beacon_attester_domain(epoch) + .await + .map_err(|source| Error::Client { + context: "fetch beacon attester domain", + source: boxed(source), + })?; + + for attester_duty in duties { + if attester_duty.slot != slot { + continue; + } + + for attestation in attestations.iter_mut() { + if attestation_matches_duty(attestation, &attester_duty, domain)? { + attestation.validator_index = Some(attester_duty.validator_index); + break; + } + } + } + + Ok(()) + } +} + +fn downcast_signed_data(data: &dyn SignedData, error: Error) -> Result +where + T: SignedData + Clone + 'static, +{ + let any = data as &dyn Any; + any.downcast_ref::().cloned().ok_or(error) +} + +fn set_to_one(set: &SignedDataSet) -> Result<(PubKey, &dyn SignedData)> { + if set.len() != 1 { + return Err(Error::ExpectedOneItemInSet); + } + + let Some((pubkey, data)) = set.iter().next() else { + unreachable!("set length checked") + }; + + Ok((*pubkey, data.as_ref())) +} + +fn set_values_to(set: &SignedDataSet, error: E, map: M) -> Result> +where + T: SignedData + Clone + 'static, + E: Fn() -> Error, + M: Fn(T) -> U, +{ + set.values() + .map(|data| { + let value = downcast_signed_data::(data.as_ref(), error())?; + Ok(map(value)) + }) + .collect() +} + +fn set_to_attestations(set: &SignedDataSet) -> Result> { + set_values_to( + set, + || Error::InvalidAttestation, + |attestation: VersionedAttestation| attestation.0, + ) +} + +fn set_to_registrations( + set: &SignedDataSet, +) -> Result> { + set_values_to( + set, + || Error::InvalidRegistration, + |registration: VersionedSignedValidatorRegistration| registration.0, + ) +} + +fn set_to_exits(set: &SignedDataSet) -> Result> { + set.iter() + .map(|(pubkey, data)| { + downcast_signed_data::(data.as_ref(), Error::InvalidExit) + .map(|exit| (*pubkey, exit.0)) + }) + .collect() +} + +fn set_to_agg_and_proof( + set: &SignedDataSet, +) -> Result> { + set_values_to( + set, + || Error::InvalidAggregateAndProof, + |aggregate_and_proof: VersionedSignedAggregateAndProof| aggregate_and_proof.0, + ) +} + +fn set_to_sync_messages(set: &SignedDataSet) -> Result> { + set_values_to( + set, + || Error::InvalidSyncCommitteeMessage, + |message: SignedSyncMessage| message.0, + ) +} + +fn set_to_sync_contributions( + set: &SignedDataSet, +) -> Result> { + set_values_to( + set, + || Error::InvalidSyncCommitteeContribution, + |contribution: SignedSyncContributionAndProof| contribution.0, + ) +} + +fn attestations_need_validator_indices(attestations: &[versioned::VersionedAttestation]) -> bool { + for attestation in attestations { + if data_version_is_before_electra(attestation.version) { + break; + } + + if attestation.validator_index.is_none() { + return true; + } + } + + false +} + +async fn resolve_active_validators_indices( + client: &BeaconNodeClient, + epoch: phase0::Epoch, +) -> Result> { + let validators = client + .complete_validators() + .await + .map_err(|source| Error::Client { + context: "complete validators", + source: boxed(source), + })?; + let mut indices = Vec::new(); + + for (index, datum) in validators.iter() { + let validator = + CompleteValidator::try_from(datum).map_err(|_| Error::InvalidValidatorField { + context: "activation epoch", + })?; + if !validator.status.is_active() && validator.activation_epoch != epoch { + continue; + } + + indices.push(*index); + } + + Ok(indices) +} + +fn attestation_matches_duty( + attestation: &versioned::VersionedAttestation, + attester_duty: &AttesterDuty, + domain: phase0::Domain, +) -> Result { + let payload = attestation + .attestation + .as_ref() + .ok_or(Error::InvalidAttestation)?; + let object_root = payload.data().tree_hash_root().0; + let signing_root = phase0::SigningData { + object_root, + domain, + } + .tree_hash_root() + .0; + let signature = payload.signature(); + + match BlstImpl.verify(&attester_duty.pubkey, &signing_root, &signature) { + Ok(()) => Ok(true), + Err(pluto_crypto::types::Error::VerificationFailed(_)) => Ok(false), + Err(source) => Err(Error::Crypto { + context: "sig verification", + source, + }), + } +} + +async fn first_slot_in_current_epoch( + client: &EthBeaconNodeApiClient, +) -> Result { + let genesis_time = client + .fetch_genesis_time() + .await + .map_err(|source| Error::Client { + context: "fetch genesis time", + source: boxed(source), + })?; + let (slot_duration, slots_per_epoch) = + client + .fetch_slots_config() + .await + .map_err(|source| Error::Client { + context: "fetch slots config", + source: boxed(source), + })?; + let slot_duration = + Duration::from_std(slot_duration).map_err(|_| Error::ArithmeticOverflow { + context: "slot duration", + })?; + + let chain_age = Utc::now().signed_duration_since(genesis_time); + let chain_age_ms = chain_age.num_milliseconds(); + let slot_duration_ms = slot_duration.num_milliseconds(); + if slot_duration_ms <= 0 { + return Err(Error::InvalidTime { + context: "slot duration", + value: slot_duration_ms, + }); + } + let current_slot_i64 = + chain_age_ms + .checked_div(slot_duration_ms) + .ok_or(Error::ArithmeticOverflow { + context: "current slot", + })?; + let current_slot = u64::try_from(current_slot_i64).map_err(|_| Error::InvalidTime { + context: "current slot", + value: current_slot_i64, + })?; + let current_epoch = + current_slot + .checked_div(slots_per_epoch) + .ok_or(Error::ArithmeticOverflow { + context: "current epoch", + })?; + let first_slot = + current_epoch + .checked_mul(slots_per_epoch) + .ok_or(Error::ArithmeticOverflow { + context: "first slot in epoch", + })?; + + Ok(crate::types::SlotNumber::new(first_slot)) +} + +fn boxed(source: impl StdError + Send + Sync + 'static) -> BoxError { + Box::new(source) +} + +fn duration_to_std(duration: Duration, context: &'static str) -> Result { + duration.to_std().map_err(|_| Error::InvalidTime { + context, + value: duration.num_milliseconds(), + }) +} + +fn duration_from_std(duration: StdDuration, context: &'static str) -> Result { + Duration::from_std(duration).map_err(|_| Error::ArithmeticOverflow { context }) +} + +fn div_duration(duration: Duration, divisor: i32, context: &'static str) -> Result { + let divisor = i64::from(divisor); + let nanos = duration + .num_nanoseconds() + .ok_or(Error::ArithmeticOverflow { context })?; + Ok(Duration::nanoseconds( + nanos + .checked_div(divisor) + .ok_or(Error::ArithmeticOverflow { context })?, + )) +} + +fn mul_duration(duration: Duration, multiplier: i32, context: &'static str) -> Result { + duration + .checked_mul(multiplier) + .ok_or(Error::ArithmeticOverflow { context }) +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; + + use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; + use pluto_eth2api::{ + GetStateValidatorsResponseResponse, ValidatorResponseValidator, + spec::{bellatrix, electra, phase0}, + v1, + valcache::ValidatorCache, + versioned::{self, AttestationPayload}, + }; + use pluto_testutil::BeaconMock; + use rand::{SeedableRng, rngs::StdRng}; + use serde_json::{Value, json}; + use tree_hash::TreeHash; + use wiremock::{ + Mock, MockServer, ResponseTemplate, + matchers::{method, path}, + }; + + use super::*; + use crate::{ + signeddata::{ + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + VersionedAttestation, VersionedSignedAggregateAndProof, VersionedSignedProposal, + VersionedSignedValidatorRegistration, + }, + types::{Duty, Slot, SlotNumber}, + }; + + fn validator_datum( + index: u64, + pubkey: &phase0::BLSPubKey, + status: ValidatorStatus, + activation_epoch: u64, + ) -> GetStateValidatorsResponseResponseDatum { + GetStateValidatorsResponseResponseDatum { + index: index.to_string(), + balance: "32000000000".to_string(), + status, + validator: ValidatorResponseValidator { + pubkey: hex0x(pubkey), + withdrawal_credentials: + "0x0000000000000000000000000000000000000000000000000000000000000000".to_string(), + effective_balance: "32000000000".to_string(), + slashed: false, + activation_eligibility_epoch: "0".to_string(), + activation_epoch: activation_epoch.to_string(), + exit_epoch: "18446744073709551615".to_string(), + withdrawable_epoch: "18446744073709551615".to_string(), + }, + } + } + + /// Builds a [`BeaconNodeClient`] whose validator cache is backed by the + /// `head` validators endpoint returning `datums`. + async fn cached_client( + beacon: &BeaconMock, + datums: Vec, + ) -> BeaconNodeClient { + Mock::given(method("POST")) + .and(path("/eth/v1/beacon/states/head/validators")) + .respond_with(ResponseTemplate::new(200).set_body_json( + GetStateValidatorsResponseResponse { + execution_optimistic: false, + finalized: true, + data: datums, + }, + )) + .with_priority(1) + .mount(beacon.server()) + .await; + + let client = BeaconNodeClient::new(beacon.client().clone()); + client + .set_validator_cache(ValidatorCache::new(beacon.client().clone(), vec![])) + .await; + client + } + + fn pubkey(byte: u8) -> PubKey { + PubKey::from([byte; 48]) + } + + fn signed_set(pubkey: PubKey, data: impl SignedData + 'static) -> SignedDataSet { + HashMap::from([(pubkey, Box::new(data) as Box)]) + } + + fn hex0x(bytes: impl AsRef<[u8]>) -> String { + format!("0x{}", hex::encode(bytes)) + } + + async fn new_broadcaster() -> (BeaconMock, Broadcaster) { + let beacon = BeaconMock::builder().build().await.expect("beacon mock"); + mount_submit_successes(beacon.server()).await; + let broadcaster = Broadcaster::new(BeaconNodeClient::new(beacon.client().clone())) + .await + .expect("broadcaster"); + + (beacon, broadcaster) + } + + async fn mount_submit_successes(server: &MockServer) { + for endpoint in [ + "/eth/v2/beacon/pool/attestations", + "/eth/v2/beacon/blocks", + "/eth/v2/beacon/blinded_blocks", + "/eth/v1/validator/register_validator", + "/eth/v1/beacon/pool/voluntary_exits", + "/eth/v2/validator/aggregate_and_proofs", + "/eth/v1/beacon/pool/sync_committees", + "/eth/v1/validator/contribution_and_proofs", + ] { + Mock::given(method("POST")) + .and(path(endpoint)) + .respond_with(ResponseTemplate::new(200)) + .with_priority(1) + .mount(server) + .await; + } + } + + async fn mount_prior_attestation_known(server: &MockServer) { + Mock::given(method("POST")) + .and(path("/eth/v2/beacon/pool/attestations")) + .respond_with(ResponseTemplate::new(400).set_body_json(json!({ + "code": 400, + "message": "invalid attestation", + "failures": [ + { "index": 0, "message": "Verification: PriorAttestationKnown" } + ] + }))) + .with_priority(1) + .mount(server) + .await; + } + + fn attester_duties_body( + slot: phase0::Slot, + validator_index: phase0::ValidatorIndex, + pubkey: phase0::BLSPubKey, + ) -> Value { + json!({ + "data": [{ + "pubkey": hex0x(pubkey), + "validator_index": validator_index.to_string(), + "committee_index": "0", + "committee_length": "1", + "committees_at_slot": "1", + "validator_committee_index": "0", + "slot": slot.to_string() + }], + "dependent_root": hex0x([0u8; 32]), + "execution_optimistic": false + }) + } + + fn deterministic_electra_spec() -> Value { + json!({ + "SECONDS_PER_SLOT": "12", + "SLOTS_PER_EPOCH": "16", + "GENESIS_FORK_VERSION": "0x01017000", + "ALTAIR_FORK_VERSION": "0x20000910", + "ALTAIR_FORK_EPOCH": "0", + "BELLATRIX_FORK_VERSION": "0x30000910", + "BELLATRIX_FORK_EPOCH": "1", + "CAPELLA_FORK_VERSION": "0x40000910", + "CAPELLA_FORK_EPOCH": "2", + "DENEB_FORK_VERSION": "0x50000910", + "DENEB_FORK_EPOCH": "2", + "ELECTRA_FORK_VERSION": "0x60000910", + "ELECTRA_FORK_EPOCH": "3", + "FULU_FORK_VERSION": "0x70000910", + "FULU_FORK_EPOCH": u64::MAX.to_string(), + "DOMAIN_BEACON_ATTESTER": "0x01000000", + "DOMAIN_VOLUNTARY_EXIT": "0x04000000", + }) + } + + fn attestation_data(slot: u64, epoch: u64) -> phase0::AttestationData { + phase0::AttestationData { + slot, + index: 3, + beacon_block_root: [1; 32], + source: phase0::Checkpoint { + epoch: epoch.saturating_sub(1), + root: [2; 32], + }, + target: phase0::Checkpoint { + epoch, + root: [3; 32], + }, + } + } + + fn deneb_attestation() -> versioned::VersionedAttestation { + versioned::VersionedAttestation { + version: versioned::DataVersion::Deneb, + validator_index: Some(7), + attestation: Some(AttestationPayload::Deneb(phase0::Attestation { + aggregation_bits: phase0::BitList::with_bits(8, &[0]), + data: attestation_data(4, 1), + signature: [4; 96], + })), + } + } + + fn signed_electra_attestation( + secret: &pluto_crypto::types::PrivateKey, + domain: phase0::Domain, + slot: u64, + epoch: u64, + ) -> versioned::VersionedAttestation { + let data = attestation_data(slot, epoch); + let signing_root = phase0::SigningData { + object_root: data.tree_hash_root().0, + domain, + } + .tree_hash_root() + .0; + let signature = BlstImpl.sign(secret, &signing_root).expect("sign"); + + versioned::VersionedAttestation { + version: versioned::DataVersion::Electra, + validator_index: None, + attestation: Some(AttestationPayload::Electra(electra::Attestation { + aggregation_bits: phase0::BitList::with_bits(8, &[0]), + data, + signature, + committee_bits: pluto_ssz::BitVector::with_bits(&[3]), + })), + } + } + + fn phase0_body() -> phase0::BeaconBlockBody { + phase0::BeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: phase0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: phase0::SszList::from(vec![]), + attester_slashings: phase0::SszList::from(vec![]), + attestations: phase0::SszList::from(vec![]), + deposits: phase0::SszList::from(vec![]), + voluntary_exits: phase0::SszList::from(vec![]), + } + } + + fn phase0_signed_proposal() -> versioned::VersionedSignedProposal { + versioned::VersionedSignedProposal { + version: versioned::DataVersion::Phase0, + blinded: false, + block: versioned::SignedProposalBlock::Phase0(phase0::SignedBeaconBlock { + message: phase0::BeaconBlock { + slot: 1, + proposer_index: 2, + parent_root: [3; 32], + state_root: [4; 32], + body: phase0_body(), + }, + signature: [5; 96], + }), + } + } + + fn bellatrix_blinded_proposal() -> versioned::VersionedSignedProposal { + versioned::VersionedSignedProposal { + version: versioned::DataVersion::Bellatrix, + blinded: true, + block: versioned::SignedProposalBlock::BellatrixBlinded( + bellatrix::SignedBlindedBeaconBlock { + message: bellatrix::BlindedBeaconBlock { + slot: 1, + proposer_index: 2, + parent_root: [3; 32], + state_root: [4; 32], + body: bellatrix::BlindedBeaconBlockBody { + randao_reveal: [0; 96], + eth1_data: phase0::ETH1Data { + deposit_root: [0; 32], + deposit_count: 0, + block_hash: [0; 32], + }, + graffiti: [0; 32], + proposer_slashings: phase0::SszList::from(vec![]), + attester_slashings: phase0::SszList::from(vec![]), + attestations: phase0::SszList::from(vec![]), + deposits: phase0::SszList::from(vec![]), + voluntary_exits: phase0::SszList::from(vec![]), + sync_aggregate: altair::SyncAggregate { + sync_committee_bits: Default::default(), + sync_committee_signature: [0; 96], + }, + execution_payload_header: bellatrix::ExecutionPayloadHeader { + parent_hash: [0; 32], + fee_recipient: [0; 20], + state_root: [0; 32], + receipts_root: [0; 32], + logs_bloom: [0; 256], + prev_randao: [0; 32], + block_number: 0, + gas_limit: 0, + gas_used: 0, + timestamp: 0, + extra_data: phase0::SszList::from(vec![]), + base_fee_per_gas: alloy::primitives::U256::ZERO, + block_hash: [0; 32], + transactions_root: [0; 32], + }, + }, + }, + signature: [9; 96], + }, + ), + } + } + + fn registration() -> versioned::VersionedSignedValidatorRegistration { + versioned::VersionedSignedValidatorRegistration { + version: versioned::BuilderVersion::V1, + v1: Some(v1::SignedValidatorRegistration { + message: v1::ValidatorRegistration { + fee_recipient: [1; 20], + gas_limit: 30_000_000, + timestamp: 42, + pubkey: [2; 48], + }, + signature: [3; 96], + }), + } + } + + fn signed_exit(index: u64) -> phase0::SignedVoluntaryExit { + phase0::SignedVoluntaryExit { + message: phase0::VoluntaryExit { + epoch: 1, + validator_index: index, + }, + signature: [4; 96], + } + } + + fn signed_aggregate() -> versioned::VersionedSignedAggregateAndProof { + versioned::VersionedSignedAggregateAndProof { + version: versioned::DataVersion::Deneb, + aggregate_and_proof: versioned::SignedAggregateAndProofPayload::Deneb( + phase0::SignedAggregateAndProof { + message: phase0::AggregateAndProof { + aggregator_index: 1, + aggregate: phase0::Attestation { + aggregation_bits: phase0::BitList::with_bits(8, &[0]), + data: attestation_data(4, 1), + signature: [5; 96], + }, + selection_proof: [6; 96], + }, + signature: [7; 96], + }, + ), + } + } + + fn sync_message() -> altair::SyncCommitteeMessage { + altair::SyncCommitteeMessage { + slot: 1, + beacon_block_root: [2; 32], + validator_index: 3, + signature: [4; 96], + } + } + + fn sync_contribution() -> altair::SignedContributionAndProof { + altair::SignedContributionAndProof { + message: altair::ContributionAndProof { + aggregator_index: 1, + contribution: altair::SyncCommitteeContribution { + slot: 2, + beacon_block_root: [3; 32], + subcommittee_index: 4, + aggregation_bits: Default::default(), + signature: [5; 96], + }, + selection_proof: [6; 96], + }, + signature: [7; 96], + } + } + + #[tokio::test] + async fn broadcast_attester_submits_and_swallows_prior_known() { + let beacon = BeaconMock::builder().build().await.expect("beacon mock"); + mount_prior_attestation_known(beacon.server()).await; + let broadcaster = Broadcaster::new(BeaconNodeClient::new(beacon.client().clone())) + .await + .expect("broadcaster"); + let set = signed_set( + pubkey(1), + VersionedAttestation::new(deneb_attestation()).expect("attestation"), + ); + + broadcaster + .broadcast(Duty::new_attester_duty(SlotNumber::new(1)), set) + .await + .expect("prior known swallowed"); + + let post_paths = beacon + .server() + .received_requests() + .await + .expect("requests") + .into_iter() + .filter(|request| request.method.as_str() == "POST") + .map(|request| request.url.path().to_string()) + .collect::>(); + assert_eq!(post_paths, vec!["/eth/v2/beacon/pool/attestations"]); + } + + #[tokio::test] + async fn broadcast_attester_backfills_electra_validator_index() { + let secret = BlstImpl + .generate_insecure_secret(StdRng::seed_from_u64(42)) + .expect("secret"); + let public_key = BlstImpl.secret_to_public_key(&secret).expect("pubkey"); + let beacon = BeaconMock::builder() + .spec(deterministic_electra_spec()) + .endpoint_overrides(vec![( + "/eth/v1/validator/duties/attester/3".to_string(), + attester_duties_body(12, 99, public_key), + )]) + .build() + .await + .expect("beacon mock"); + let domain = beacon + .client() + .fetch_beacon_attester_domain(3) + .await + .expect("domain"); + let client = cached_client( + &beacon, + vec![validator_datum( + 99, + &public_key, + ValidatorStatus::ActiveOngoing, + 0, + )], + ) + .await; + let attestation = signed_electra_attestation(&secret, domain, 12, 3); + assert!( + attestation_matches_duty( + &attestation, + &AttesterDuty { + slot: 12, + validator_index: 99, + pubkey: public_key, + }, + domain, + ) + .expect("matching attestation") + ); + assert_eq!( + beacon + .client() + .fetch_attester_duties_for_indices(3, vec![99]) + .await + .expect("duties"), + vec![AttesterDuty { + slot: 12, + validator_index: 99, + pubkey: public_key, + }] + ); + assert_eq!( + beacon + .client() + .fetch_beacon_attester_domain(3) + .await + .expect("domain"), + domain + ); + let broadcaster = Broadcaster::new(client).await.expect("broadcaster"); + + let mut attestations = vec![attestation]; + broadcaster + .populate_missing_validator_indices(&mut attestations) + .await + .expect("populate validator index"); + + assert_eq!(attestations[0].validator_index, Some(99)); + } + + #[tokio::test] + async fn broadcast_routes_all_submit_duties() { + let (beacon, broadcaster) = new_broadcaster().await; + + let proposal = phase0_signed_proposal(); + broadcaster + .broadcast( + Duty::new_proposer_duty(SlotNumber::new(1)), + signed_set( + pubkey(1), + VersionedSignedProposal::new(proposal.clone()).expect("proposal"), + ), + ) + .await + .expect("proposal"); + + let blinded = bellatrix_blinded_proposal(); + broadcaster + .broadcast( + Duty::new_proposer_duty(SlotNumber::new(1)), + signed_set( + pubkey(1), + VersionedSignedProposal::new(blinded.clone()).expect("proposal"), + ), + ) + .await + .expect("blinded"); + + let registration = registration(); + broadcaster + .broadcast( + Duty::new_builder_registration_duty(SlotNumber::new(0)), + signed_set( + pubkey(2), + VersionedSignedValidatorRegistration::new(registration.clone()) + .expect("registration"), + ), + ) + .await + .expect("registration"); + + let exit = signed_exit(3); + broadcaster + .broadcast( + Duty::new_voluntary_exit_duty(SlotNumber::new(1)), + signed_set(pubkey(3), SignedVoluntaryExit::new(exit.clone())), + ) + .await + .expect("exit"); + + let aggregate = signed_aggregate(); + broadcaster + .broadcast( + Duty::new_aggregator_duty(SlotNumber::new(1)), + signed_set( + pubkey(4), + VersionedSignedAggregateAndProof::new(aggregate.clone()), + ), + ) + .await + .expect("aggregate"); + + let message = sync_message(); + broadcaster + .broadcast( + Duty::new_sync_message_duty(SlotNumber::new(1)), + signed_set(pubkey(5), SignedSyncMessage::new(message.clone())), + ) + .await + .expect("sync message"); + + let contribution = sync_contribution(); + broadcaster + .broadcast( + Duty::new_sync_contribution_duty(SlotNumber::new(1)), + signed_set( + pubkey(6), + SignedSyncContributionAndProof::new(contribution.clone()), + ), + ) + .await + .expect("sync contribution"); + + let mut post_paths = beacon + .server() + .received_requests() + .await + .expect("requests") + .into_iter() + .filter(|request| request.method.as_str() == "POST") + .map(|request| request.url.path().to_string()) + .collect::>(); + post_paths.sort(); + assert_eq!( + post_paths, + vec![ + "/eth/v1/beacon/pool/sync_committees", + "/eth/v1/beacon/pool/voluntary_exits", + "/eth/v1/validator/contribution_and_proofs", + "/eth/v1/validator/register_validator", + "/eth/v2/beacon/blinded_blocks", + "/eth/v2/beacon/blocks", + "/eth/v2/validator/aggregate_and_proofs", + ] + ); + } + + #[tokio::test] + async fn broadcast_other_duties_match_go_behavior() { + let (_beacon, broadcaster) = new_broadcaster().await; + + assert!(matches!( + broadcaster + .broadcast( + Duty::new_builder_proposer_duty(SlotNumber::new(1)), + HashMap::new() + ) + .await, + Err(Error::DeprecatedDutyBuilderProposer) + )); + broadcaster + .broadcast(Duty::new_randao_duty(SlotNumber::new(1)), HashMap::new()) + .await + .expect("randao"); + broadcaster + .broadcast( + Duty::new_prepare_aggregator_duty(SlotNumber::new(1)), + HashMap::new(), + ) + .await + .expect("prepare aggregator"); + broadcaster + .broadcast( + Duty::new_prepare_sync_contribution_duty(SlotNumber::new(1)), + HashMap::new(), + ) + .await + .expect("prepare sync contribution"); + assert!(matches!( + broadcaster + .broadcast(Duty::new_info_sync_duty(SlotNumber::new(1)), HashMap::new()) + .await, + Err(Error::UnsupportedDutyType) + )); + } + + #[test] + fn set_conversion_errors_match_go_strings() { + let bad = signed_set(pubkey(1), SignedVoluntaryExit::new(signed_exit(1))); + + assert_eq!( + set_to_attestations(&bad).unwrap_err().to_string(), + "invalid attestation" + ); + assert_eq!( + set_to_registrations(&bad).unwrap_err().to_string(), + "invalid registration" + ); + assert_eq!( + set_to_agg_and_proof(&bad).unwrap_err().to_string(), + "invalid aggregate and proof" + ); + assert_eq!( + set_to_sync_messages(&bad).unwrap_err().to_string(), + "invalid sync committee message" + ); + assert_eq!( + set_to_sync_contributions(&bad).unwrap_err().to_string(), + "invalid sync committee contribution" + ); + assert_eq!( + set_to_one(&HashMap::new()).unwrap_err().to_string(), + "expected one item in set" + ); + } + + #[tokio::test] + async fn resolve_active_validators_indices_filters_active_and_activation_epoch() { + let beacon = BeaconMock::builder().build().await.expect("beacon mock"); + let client = cached_client( + &beacon, + vec![ + // active -> included + validator_datum(1, &[1u8; 48], ValidatorStatus::ActiveExiting, 0), + // inactive but activates at the queried epoch -> included + validator_datum(2, &[2u8; 48], ValidatorStatus::PendingQueued, 9), + // inactive, activates later -> excluded + validator_datum(3, &[3u8; 48], ValidatorStatus::PendingQueued, 10), + ], + ) + .await; + + let mut indices = resolve_active_validators_indices(&client, 9) + .await + .expect("indices"); + indices.sort_unstable(); + assert_eq!(indices, vec![1, 2]); + } + + /// Builds a recaster whose active-validator set resolves to `pubkey(1)`. + async fn active_recaster(beacon: &BeaconMock) -> Recaster { + let client = cached_client( + beacon, + vec![validator_datum( + 1, + &[1u8; 48], + ValidatorStatus::ActiveOngoing, + 0, + )], + ) + .await; + Recaster::new(client) + } + + #[tokio::test] + async fn recaster_recasts_only_first_epoch_active_latest_registration() { + let beacon = BeaconMock::builder().build().await.expect("beacon mock"); + let recaster = active_recaster(&beacon).await; + let seen = Arc::new(Mutex::new(Vec::new())); + let seen_clone = Arc::clone(&seen); + recaster + .subscribe(move |duty, set| { + let seen = Arc::clone(&seen_clone); + async move { + seen.lock().expect("seen").push((duty, set)); + Ok(()) + } + }) + .expect("subscribe"); + + recaster + .store( + Duty::new_builder_registration_duty(SlotNumber::new(4)), + &signed_set( + pubkey(1), + VersionedSignedValidatorRegistration::new(registration()) + .expect("registration"), + ), + ) + .expect("store"); + recaster + .store( + Duty::new_builder_registration_duty(SlotNumber::new(2)), + &signed_set( + pubkey(1), + VersionedSignedValidatorRegistration::new(registration()) + .expect("registration"), + ), + ) + .expect("older ignored"); + recaster + .store( + Duty::new_builder_registration_duty(SlotNumber::new(5)), + &signed_set( + pubkey(2), + VersionedSignedValidatorRegistration::new(registration()) + .expect("registration"), + ), + ) + .expect("inactive stored"); + + recaster + .slot_ticked(Slot { + slot: SlotNumber::new(5), + time: Utc::now(), + slot_duration: Duration::seconds(12), + slots_per_epoch: 4, + }) + .await + .expect("not first"); + assert!(seen.lock().expect("seen").is_empty()); + + recaster + .slot_ticked(Slot { + slot: SlotNumber::new(8), + time: Utc::now(), + slot_duration: Duration::seconds(12), + slots_per_epoch: 4, + }) + .await + .expect("first"); + + let seen = seen.lock().expect("seen"); + assert_eq!(seen.len(), 1); + assert_eq!(seen[0].0.slot, SlotNumber::new(4)); + assert!(seen[0].1.contains_key(&pubkey(1))); + assert!(!seen[0].1.contains_key(&pubkey(2))); + } + + #[tokio::test] + async fn recaster_subscriber_error_is_not_returned() { + let beacon = BeaconMock::builder().build().await.expect("beacon mock"); + let recaster = active_recaster(&beacon).await; + recaster + .subscribe(|_, _| async { Err(Error::UnsupportedDutyType) }) + .expect("subscribe"); + recaster + .store( + Duty::new_builder_registration_duty(SlotNumber::new(4)), + &signed_set( + pubkey(1), + VersionedSignedValidatorRegistration::new(registration()) + .expect("registration"), + ), + ) + .expect("store"); + + recaster + .slot_ticked(Slot { + slot: SlotNumber::new(8), + time: Utc::now(), + slot_duration: Duration::seconds(12), + slots_per_epoch: 4, + }) + .await + .expect("subscriber error logged only"); + } +} diff --git a/crates/core/src/bcast/recast.rs b/crates/core/src/bcast/recast.rs new file mode 100644 index 00000000..624ea92e --- /dev/null +++ b/crates/core/src/bcast/recast.rs @@ -0,0 +1,152 @@ +use std::{ + collections::{HashMap, HashSet}, + future::Future, + sync::{Arc, Mutex}, +}; + +use futures::future::BoxFuture; +use pluto_eth2api::BeaconNodeClient; + +use crate::{ + bcast::{ + Error, Result, boxed, + metrics::{instrument_recast, instrument_recast_error, instrument_recast_registration}, + }, + types::{Duty, DutyType, PubKey, SignedData, SignedDataSet, Slot}, +}; + +type RecastFuture = BoxFuture<'static, Result<()>>; +type RecastSubscriber = Arc RecastFuture + Send + Sync>; + +#[derive(Clone)] +struct RecastTuple { + duty: Duty, + agg_data: Box, +} + +#[derive(Default)] +struct RecastState { + tuples: HashMap, + subs: Vec, +} + +/// Rebroadcasts builder registrations every epoch. +pub struct Recaster { + client: BeaconNodeClient, + state: Mutex, +} + +impl Recaster { + /// Creates a new recaster. + pub fn new(client: BeaconNodeClient) -> Self { + Self { + client, + state: Mutex::new(RecastState::default()), + } + } + + /// Subscribes to rebroadcasted duties. + pub fn subscribe(&self, sub: F) -> Result<()> + where + F: Fn(Duty, SignedDataSet) -> Fut + Send + Sync + 'static, + Fut: Future> + Send + 'static, + { + self.state + .lock() + .map_err(|_| Error::MutexPoisoned("recaster state"))? + .subs + .push(Arc::new(move |duty, set| Box::pin(sub(duty, set)))); + Ok(()) + } + + /// Stores aggregate signed builder registrations for rebroadcasting. + pub fn store(&self, duty: Duty, set: &SignedDataSet) -> Result<()> { + if duty.duty_type != DutyType::BuilderRegistration { + return Ok(()); + } + + for (pubkey, agg_data) in set { + self.store_one(duty.clone(), *pubkey, agg_data.as_ref())?; + } + + Ok(()) + } + + fn store_one(&self, duty: Duty, pubkey: PubKey, agg_data: &dyn SignedData) -> Result<()> { + let mut state = self + .state + .lock() + .map_err(|_| Error::MutexPoisoned("recaster state"))?; + + if let Some(tuple) = state.tuples.get(&pubkey) + && tuple.duty.slot.inner() >= duty.slot.inner() + { + return Ok(()); + } + + let agg_data = dyn_clone::clone_box(agg_data); + state.tuples.insert(pubkey, RecastTuple { duty, agg_data }); + instrument_recast_registration(pubkey); + + Ok(()) + } + + /// Called when new slots tick. + pub async fn slot_ticked(&self, slot: Slot) -> Result<()> { + if !slot.first_in_epoch() { + return Ok(()); + } + + let active_validators: HashSet = self + .client + .active_validators() + .await + .map_err(|source| Error::Client { + context: "get active validator", + source: boxed(source), + })? + .pubkeys() + .map(|pubkey| PubKey::from(*pubkey)) + .collect(); + + let (sets, subs) = { + let state = self + .state + .lock() + .map_err(|_| Error::MutexPoisoned("recaster state"))?; + + let mut sets: HashMap = HashMap::new(); + for (pubkey, tuple) in &state.tuples { + if !active_validators.contains(pubkey) { + continue; + } + + sets.entry(tuple.duty.clone()) + .or_default() + .insert(*pubkey, tuple.agg_data.clone()); + } + + (sets, state.subs.clone()) + }; + + for (duty, mut set) in sets { + let last_sub_idx = subs.len().saturating_sub(1); + for (idx, sub) in subs.iter().enumerate() { + let set_for_sub = if idx == last_sub_idx { + std::mem::take(&mut set) + } else { + set.clone() + }; + + if let Err(error) = sub(duty.clone(), set_for_sub).await { + tracing::error!(%error, %duty, "Rebroadcast duty error (will retry next epoch)"); + instrument_recast_error(&duty); + } + + instrument_recast(&duty); + } + } + + Ok(()) + } +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index a53341bd..a030c203 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -42,6 +42,9 @@ pub mod sigagg; /// Implementations of AggSigDB. pub mod aggsigdb; +/// Broadcaster for aggregate signed duty data. +pub mod bcast; + mod parsigex_codec; // SSZ codec operates on compile-time-constant byte sizes and offsets. From 13047bfb7d1a6bd3231f421887caef0895e292c2 Mon Sep 17 00:00:00 2001 From: Quang Le Date: Fri, 12 Jun 2026 18:29:37 +0700 Subject: [PATCH 3/3] fix: lint --- crates/core/src/bcast/mod.rs | 4 ++-- crates/eth2api/src/spec/version.rs | 3 ++- crates/eth2api/src/validator_duty.rs | 30 ++++++++++++++++++---------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/crates/core/src/bcast/mod.rs b/crates/core/src/bcast/mod.rs index 66961ce3..c7855557 100644 --- a/crates/core/src/bcast/mod.rs +++ b/crates/core/src/bcast/mod.rs @@ -8,8 +8,8 @@ use std::{any::Any, error::Error as StdError, time::Duration as StdDuration}; use chrono::{DateTime, Duration, Utc}; use pluto_crypto::{blst_impl::BlstImpl, tbls::Tbls}; use pluto_eth2api::{ - AttesterDuty, BeaconNodeClient, EthBeaconNodeApiClient, GetStateValidatorsResponseResponseDatum, - ValidatorStatus, data_version_is_before_electra, + AttesterDuty, BeaconNodeClient, EthBeaconNodeApiClient, + GetStateValidatorsResponseResponseDatum, ValidatorStatus, data_version_is_before_electra, spec::{altair, phase0}, versioned, }; diff --git a/crates/eth2api/src/spec/version.rs b/crates/eth2api/src/spec/version.rs index d8ed7511..7f14de83 100644 --- a/crates/eth2api/src/spec/version.rs +++ b/crates/eth2api/src/spec/version.rs @@ -79,7 +79,8 @@ impl DataVersion { } } - /// Maps to the equivalent beacon API [`ConsensusVersion`](crate::ConsensusVersion). + /// Maps to the equivalent beacon API + /// [`ConsensusVersion`](crate::ConsensusVersion). pub const fn to_consensus_version(self) -> Result { use crate::ConsensusVersion; match self { diff --git a/crates/eth2api/src/validator_duty.rs b/crates/eth2api/src/validator_duty.rs index 0690981e..dde92fbf 100644 --- a/crates/eth2api/src/validator_duty.rs +++ b/crates/eth2api/src/validator_duty.rs @@ -127,7 +127,10 @@ impl EthBeaconNodeApiClient { &self, proposal: versioned::VersionedSignedProposal, ) -> Result<()> { - let version = proposal.version.to_consensus_version().map_err(error_message)?; + let version = proposal + .version + .to_consensus_version() + .map_err(error_message)?; let body = proposal_request_body(proposal)?; let request = crate::PublishBlockV2Request { query: crate::PublishBlockV2RequestQuery { @@ -154,7 +157,10 @@ impl EthBeaconNodeApiClient { &self, proposal: versioned::VersionedSignedBlindedProposal, ) -> Result<()> { - let version = proposal.version.to_consensus_version().map_err(error_message)?; + let version = proposal + .version + .to_consensus_version() + .map_err(error_message)?; let body = blinded_proposal_request_body(proposal)?; let request = crate::PublishBlindedBlockV2Request { query: crate::PublishBlindedBlockV2RequestQuery { @@ -549,20 +555,24 @@ fn aggregate_and_proofs_request_body( crate::AggregateAndProofRequestBody::Array( envelopes .into_iter() - .map(|(message, signature)| crate::AggregateAndProofRequestBodyArray { - message, - signature, - }) + .map( + |(message, signature)| crate::AggregateAndProofRequestBodyArray { + message, + signature, + }, + ) .collect(), ) } else { crate::AggregateAndProofRequestBody::Array2( envelopes .into_iter() - .map(|(message, signature)| crate::AggregateAndProofRequestBodyArray2 { - message, - signature, - }) + .map( + |(message, signature)| crate::AggregateAndProofRequestBodyArray2 { + message, + signature, + }, + ) .collect(), ) })