From 23f7fcc47cd6f247f17f3a7304bf4d36a0b63c5d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 30 Mar 2026 14:39:00 -0300 Subject: [PATCH 1/5] Refactor store mappings from per-validator to per-attestation-data keying Aligns the storage architecture with the leanSpec Python reference implementation. Three fundamental shifts: 1. Key inversion: SignatureKey = (validator_id, data_root) removed. PayloadBuffer now groups proofs by attestation data (via data_root), storing one proof entry that covers all participants instead of N duplicated entries per validator. 2. AttestationDataByRoot table eliminated. AttestationData is stored directly alongside proofs in the in-memory PayloadBuffer and gossip signature map. 3. Proof selection uses participation bits (proof.participants[validator_id]) instead of HashMap key membership. Additional changes: - Gossip signatures moved from RocksDB to in-memory (transient data consumed at interval 2 aggregation) - RocksDB tables reduced from 8 to 6 (removed GossipSignatures and AttestationDataByRoot) - StoredSignature, StoredAggregatedPayload, and SignatureKey types removed - Added Hash derive to AttestationData and Checkpoint for use as map keys --- crates/blockchain/src/store.rs | 225 +++---- .../blockchain/tests/forkchoice_spectests.rs | 11 +- crates/common/types/src/attestation.rs | 2 +- crates/common/types/src/checkpoint.rs | 13 +- crates/storage/src/api/tables.rs | 10 +- crates/storage/src/backend/rocksdb.rs | 2 - crates/storage/src/lib.rs | 4 +- crates/storage/src/store.rs | 617 ++++++++---------- crates/storage/src/types.rs | 36 - 9 files changed, 387 insertions(+), 533 deletions(-) delete mode 100644 crates/storage/src/types.rs diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index 094600a9..aa112dbb 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -4,7 +4,7 @@ use ethlambda_crypto::aggregate_signatures; use ethlambda_state_transition::{ is_proposer, process_block, process_slots, slot_is_justifiable_after, }; -use ethlambda_storage::{ForkCheckpoints, SignatureKey, Store, StoredAggregatedPayload}; +use ethlambda_storage::{ForkCheckpoints, Store}; use ethlambda_types::{ ShortRoot, attestation::{ @@ -97,15 +97,11 @@ fn update_safe_target(store: &mut Store) { let min_target_score = (num_validators * 2).div_ceil(3); let blocks = store.get_live_chain(); - // Merge both attestation pools (keys only — skip payload deserialization). + // Merge both attestation pools (known + new). // At interval 3 the migration (interval 4) hasn't run yet, so attestations // that entered "known" directly (proposer's own attestation in block body, // node's self-attestation) would be invisible without this merge. - let all_keys: HashSet = store - .iter_known_aggregated_payload_keys() - .chain(store.iter_new_aggregated_payload_keys()) - .collect(); - let attestations = store.extract_latest_attestations(all_keys.into_iter()); + let attestations = store.extract_latest_all_attestations(); let (safe_target, _weights) = ethlambda_fork_choice::compute_lmd_ghost_head( store.latest_justified().root, &blocks, @@ -120,8 +116,8 @@ fn update_safe_target(store: &mut Store) { /// Collects individual gossip signatures, aggregates them by attestation data, /// and stores the resulting proofs in the new aggregated payloads buffer. fn aggregate_committee_signatures(store: &mut Store) -> Vec { - let gossip_sigs: Vec<(SignatureKey, _)> = store.iter_gossip_signatures().collect(); - if gossip_sigs.is_empty() { + let gossip_groups = store.iter_gossip_signatures(); + if gossip_groups.is_empty() { return Vec::new(); } let _timing = metrics::time_committee_signatures_aggregation(); @@ -131,32 +127,17 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec> = HashMap::new(); - let mut keys_to_delete: Vec = Vec::new(); - let mut payload_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); - - for ((validator_id, data_root), stored_sig) in &gossip_sigs { - if let Ok(sig) = stored_sig.to_validator_signature() { - groups - .entry(*data_root) - .or_default() - .push((*validator_id, sig)); - } - } - - for (data_root, validators_and_sigs) in groups { - let Some(data) = store.get_attestation_data_by_root(&data_root) else { - continue; - }; + let mut keys_to_delete: Vec<(u64, H256)> = Vec::new(); + let mut payload_entries: Vec<(H256, AttestationData, AggregatedSignatureProof)> = Vec::new(); + for (data_root, data, validator_sigs) in &gossip_groups { let slot = data.slot; let mut sigs = vec![]; let mut pubkeys = vec![]; let mut ids = vec![]; - for (vid, sig) in &validators_and_sigs { + for (vid, sig) in validator_sigs { let Some(validator) = validators.get(*vid as usize) else { continue; }; @@ -175,7 +156,7 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec Vec = aggregated - .proof - .participant_indices() - .map(|validator_id| { - let payload = StoredAggregatedPayload { - slot: aggregated.data.slot, - proof: aggregated.proof.clone(), - }; - ((validator_id, data_root), payload) - }) - .collect(); - store.insert_new_aggregated_payloads_batch(entries); + // Store one proof per attestation data (not per validator) + store.insert_new_aggregated_payload( + data_root, + aggregated.data.clone(), + aggregated.proof.clone(), + ); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); let slot = aggregated.data.slot; @@ -592,25 +556,16 @@ fn on_block_core( let aggregated_attestations = &block.body.attestations; let attestation_signatures = &signed_block.signature.attestation_signatures; - // Process block body attestations. - // Store attestation data by root and proofs in known aggregated payloads. - let mut att_data_entries: Vec<(H256, AttestationData)> = Vec::new(); - let mut known_entries: Vec<(SignatureKey, StoredAggregatedPayload)> = Vec::new(); + // Store one proof per attestation data in known aggregated payloads. + let mut known_entries: Vec<(H256, AttestationData, AggregatedSignatureProof)> = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { let data_root = att.data.tree_hash_root(); - att_data_entries.push((data_root, att.data.clone())); - - let validator_ids: Vec<_> = validator_indices(&att.aggregation_bits).collect(); - let payload = StoredAggregatedPayload { - slot: att.data.slot, - proof: proof.clone(), - }; - - for validator_id in &validator_ids { - known_entries.push(((*validator_id, data_root), payload.clone())); + known_entries.push((data_root, att.data.clone(), proof.clone())); + // Count each participating validator as a valid attestation + for _ in validator_indices(&att.aggregation_bits) { metrics::inc_attestations_valid(); } } @@ -619,10 +574,7 @@ fn on_block_core( // The proposer's attestation should NOT affect this block's fork choice position. let proposer_vid = proposer_attestation.validator_id; let proposer_data_root = proposer_attestation.data.tree_hash_root(); - att_data_entries.push((proposer_data_root, proposer_attestation.data.clone())); - // Batch-insert all attestation data (body + proposer) in a single commit - store.insert_attestation_data_by_root_batch(att_data_entries); store.insert_known_aggregated_payloads_batch(known_entries); // Update forkchoice head based on new block and attestations @@ -633,11 +585,12 @@ fn on_block_core( if !verify { // Without sig verification, insert directly with a dummy proof let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); - let payload = StoredAggregatedPayload { - slot: proposer_attestation.data.slot, - proof: AggregatedSignatureProof::empty(participants), - }; - store.insert_new_aggregated_payload((proposer_vid, proposer_data_root), payload); + let proof = AggregatedSignatureProof::empty(participants); + store.insert_new_aggregated_payload( + proposer_data_root, + proposer_attestation.data.clone(), + proof, + ); } else { // Store the proposer's signature unconditionally for future block building. // Subnet filtering is handled at the P2P subscription layer. @@ -646,7 +599,7 @@ fn on_block_core( .map_err(|_| StoreError::SignatureDecodingFailed)?; store.insert_gossip_signature( proposer_data_root, - proposer_attestation.data.slot, + proposer_attestation.data.clone(), proposer_vid, proposer_sig, ); @@ -805,11 +758,24 @@ pub fn produce_block_with_signatures( }); } - // Single pass over known aggregated payloads: extract both attestation data and proofs - let known_payloads: Vec<_> = store.iter_known_aggregated_payloads().collect(); - - let known_attestations = - store.extract_latest_attestations(known_payloads.iter().map(|(key, _)| *key)); + // Get known aggregated payloads: data_root -> (AttestationData, Vec) + let aggregated_payloads = store.known_aggregated_payloads(); + + // Extract per-validator attestations from the already-fetched payloads + // (avoids re-locking and re-iterating the same data) + let mut known_attestations: HashMap = HashMap::new(); + for (data, proofs) in aggregated_payloads.values() { + for proof in proofs { + for vid in proof.participant_indices() { + let should_update = known_attestations + .get(&vid) + .is_none_or(|existing| existing.slot < data.slot); + if should_update { + known_attestations.insert(vid, data.clone()); + } + } + } + } let available_attestations: Vec = known_attestations .into_iter() .map(|(validator_id, data)| Attestation { validator_id, data }) @@ -818,15 +784,6 @@ pub fn produce_block_with_signatures( // Get known block roots for attestation validation let known_block_roots = store.get_block_roots(); - // Collect existing proofs for block building from the already-fetched payloads - let aggregated_payloads: HashMap> = known_payloads - .into_iter() - .map(|(key, stored_payloads)| { - let proofs = stored_payloads.into_iter().map(|sp| sp.proof).collect(); - (key, proofs) - }) - .collect(); - // Build the block using fixed-point attestation collection let (block, _post_state, signatures) = build_block( &head_state, @@ -982,21 +939,9 @@ fn aggregate_attestations_by_data(attestations: &[Attestation]) -> Vec, - aggregated_payloads: &HashMap>, + aggregated_payloads: &HashMap)>, ) -> Result<(Block, State, Vec), StoreError> { // Start with empty attestation set let mut included_attestations: Vec = Vec::new(); // Track which attestations we've already considered (by validator_id, data_root) - let mut included_keys: HashSet = HashSet::new(); + let mut included_keys: HashSet<(u64, H256)> = HashSet::new(); + + // Pre-compute data_roots to avoid recomputing tree_hash_root per iteration + let attestation_roots: Vec = available_attestations + .iter() + .map(|a| a.data.tree_hash_root()) + .collect(); // Fixed-point loop: collect attestations until no new ones can be added loop { @@ -1052,9 +1003,8 @@ fn build_block( // Find new valid attestations matching post-state requirements let mut new_attestations: Vec = Vec::new(); - for attestation in available_attestations { - let data_root = attestation.data.tree_hash_root(); - let sig_key: SignatureKey = (attestation.validator_id, data_root); + for (attestation, data_root) in available_attestations.iter().zip(&attestation_roots) { + let data_root = *data_root; // Skip if target block is unknown if !known_block_roots.contains(&attestation.data.head.root) { @@ -1067,14 +1017,24 @@ fn build_block( } // Avoid adding duplicates of attestations already in the candidate set - if included_keys.contains(&sig_key) { + let key = (attestation.validator_id, data_root); + if included_keys.contains(&key) { continue; } - // Only include if we have a proof for this attestation - if aggregated_payloads.contains_key(&sig_key) { + // Only include if we have a proof covering this validator + let has_proof = aggregated_payloads + .get(&data_root) + .is_some_and(|(_, proofs)| { + proofs.iter().any(|p| { + let vid = attestation.validator_id as usize; + vid < p.participants.len() && p.participants.get(vid).unwrap_or(false) + }) + }); + + if has_proof { new_attestations.push(attestation.clone()); - included_keys.insert(sig_key); + included_keys.insert(key); } } @@ -1122,30 +1082,24 @@ fn build_block( /// Returns a list of (attestation, proof) pairs ready for block inclusion. fn select_aggregated_proofs( attestations: &[Attestation], - aggregated_payloads: &HashMap>, + aggregated_payloads: &HashMap)>, ) -> Result<(Vec, Vec), StoreError> { let mut results = vec![]; for aggregated in aggregate_attestations_by_data(attestations) { let data = &aggregated.data; - let message = data.tree_hash_root(); + let data_root = data.tree_hash_root(); let mut remaining: HashSet = validator_indices(&aggregated.aggregation_bits).collect(); - // Select existing proofs that cover the most remaining validators - while !remaining.is_empty() { - let Some(&target_id) = remaining.iter().next() else { - break; - }; - - let Some(candidates) = aggregated_payloads - .get(&(target_id, message)) - .filter(|v| !v.is_empty()) - else { - break; - }; + // Get all proofs for this attestation data + let Some((_, candidates)) = aggregated_payloads.get(&data_root) else { + continue; + }; - let (proof, covered) = candidates + // Greedy set-cover: select proofs covering the most remaining validators + while !remaining.is_empty() { + let best = candidates .iter() .map(|p| { let covered: Vec<_> = validator_indices(&p.participants) @@ -1153,8 +1107,11 @@ fn select_aggregated_proofs( .collect(); (p, covered) }) - .max_by_key(|(_, covered)| covered.len()) - .expect("candidates is not empty"); + .max_by_key(|(_, covered)| covered.len()); + + let Some((proof, covered)) = best else { + break; + }; // No proof covers any remaining validator if covered.is_empty() { diff --git a/crates/blockchain/tests/forkchoice_spectests.rs b/crates/blockchain/tests/forkchoice_spectests.rs index 9d1c0285..5958cf5e 100644 --- a/crates/blockchain/tests/forkchoice_spectests.rs +++ b/crates/blockchain/tests/forkchoice_spectests.rs @@ -284,12 +284,8 @@ fn validate_attestation_check( let location = check.location.as_str(); let attestations: HashMap = match location { - "new" => { - st.extract_latest_attestations(st.iter_new_aggregated_payloads().map(|(key, _)| key)) - } - "known" => { - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)) - } + "new" => st.extract_latest_new_attestations(), + "known" => st.extract_latest_known_attestations(), other => { return Err( format!("Step {}: unknown attestation location: {}", step_idx, other).into(), @@ -369,8 +365,7 @@ fn validate_lexicographic_head_among( } let blocks = st.get_live_chain(); - let known_attestations: HashMap = - st.extract_latest_attestations(st.iter_known_aggregated_payloads().map(|(key, _)| key)); + let known_attestations: HashMap = st.extract_latest_known_attestations(); // Resolve all fork labels to roots and compute their weights // Map: label -> (root, slot, weight) diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index e33db5cc..15c4331d 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -17,7 +17,7 @@ pub struct Attestation { } /// Attestation content describing the validator's observed chain view. -#[derive(Debug, Clone, Encode, Decode, TreeHash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Encode, Decode, TreeHash)] pub struct AttestationData { /// The slot for which the attestation is made. pub slot: u64, diff --git a/crates/common/types/src/checkpoint.rs b/crates/common/types/src/checkpoint.rs index 328a7189..00b9f065 100644 --- a/crates/common/types/src/checkpoint.rs +++ b/crates/common/types/src/checkpoint.rs @@ -7,7 +7,18 @@ use crate::primitives::{ /// Represents a checkpoint in the chain's history. #[derive( - Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize, Encode, Decode, TreeHash, + Debug, + Clone, + Copy, + Default, + PartialEq, + Eq, + Hash, + Serialize, + Deserialize, + Encode, + Decode, + TreeHash, )] pub struct Checkpoint { /// The root hash of the checkpoint's block. diff --git a/crates/storage/src/api/tables.rs b/crates/storage/src/api/tables.rs index 35fb687d..6fd972c4 100644 --- a/crates/storage/src/api/tables.rs +++ b/crates/storage/src/api/tables.rs @@ -12,10 +12,6 @@ pub enum Table { BlockSignatures, /// State storage: H256 -> State States, - /// Gossip signatures: SignatureKey -> ValidatorSignature - GossipSignatures, - /// Attestation data indexed by tree hash root: H256 -> AttestationData - AttestationDataByRoot, /// Metadata: string keys -> various scalar values Metadata, /// Live chain index: (slot || root) -> parent_root @@ -27,13 +23,11 @@ pub enum Table { } /// All table variants. -pub const ALL_TABLES: [Table; 8] = [ +pub const ALL_TABLES: [Table; 6] = [ Table::BlockHeaders, Table::BlockBodies, Table::BlockSignatures, Table::States, - Table::GossipSignatures, - Table::AttestationDataByRoot, Table::Metadata, Table::LiveChain, ]; @@ -46,8 +40,6 @@ impl Table { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::GossipSignatures => "gossip_signatures", - Table::AttestationDataByRoot => "attestation_data_by_root", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 160ea4cc..4853873e 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -16,8 +16,6 @@ fn cf_name(table: Table) -> &'static str { Table::BlockBodies => "block_bodies", Table::BlockSignatures => "block_signatures", Table::States => "states", - Table::GossipSignatures => "gossip_signatures", - Table::AttestationDataByRoot => "attestation_data_by_root", Table::Metadata => "metadata", Table::LiveChain => "live_chain", } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 01ac010f..9c30f9c8 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -1,8 +1,6 @@ mod api; pub mod backend; mod store; -mod types; pub use api::{ALL_TABLES, StorageBackend, StorageReadView, StorageWriteBatch, Table}; -pub use store::{ForkCheckpoints, SignatureKey, Store}; -pub use types::{StoredAggregatedPayload, StoredSignature}; +pub use store::{ForkCheckpoints, Store}; diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index f2c380ff..5d2dc5a1 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, LazyLock, Mutex}; /// The tree hash root of an empty block body. @@ -9,13 +8,12 @@ use std::sync::{Arc, LazyLock, Mutex}; static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().tree_hash_root()); use crate::api::{StorageBackend, StorageWriteBatch, Table}; -use crate::types::{StoredAggregatedPayload, StoredSignature}; use ethlambda_types::{ attestation::AttestationData, block::{ - Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, - SignedBlockWithAttestation, + AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, + BlockWithAttestation, SignedBlockWithAttestation, }, checkpoint::Checkpoint, primitives::{ @@ -27,12 +25,6 @@ use ethlambda_types::{ }; use tracing::info; -/// Key for looking up individual validator signatures. -/// Used to index signature caches by (validator, message) pairs. -/// -/// Values are (validator_index, attestation_data_root). -pub type SignatureKey = (u64, H256); - /// Checkpoints to update in the forkchoice store. /// /// Used with `Store::update_checkpoints` to update head and optionally @@ -91,91 +83,137 @@ const _: () = assert!( "BLOCKS_TO_KEEP must be >= STATES_TO_KEEP" ); -/// Hard cap for the known aggregated payload buffer. -/// Matches Lantern's approach. With 9 validators, this holds -/// ~455 unique attestation messages (~30 min at 1/slot). -const AGGREGATED_PAYLOAD_CAP: usize = 4096; +/// Hard cap for the known aggregated payload buffer (number of distinct attestation messages). +/// With 1 attestation/slot, this holds ~500 messages (~33 min at 4s/slot). +const AGGREGATED_PAYLOAD_CAP: usize = 512; /// Hard cap for the new (pending) aggregated payload buffer. /// Smaller than known since new payloads are drained every interval (~4s). -/// With 9 validators at 1 attestation/slot, one interval holds ~9 entries. -const NEW_PAYLOAD_CAP: usize = 512; +const NEW_PAYLOAD_CAP: usize = 64; + +/// An entry in the payload buffer: attestation data + set of proofs. +#[derive(Clone)] +struct PayloadEntry { + data: AttestationData, + proofs: Vec, +} /// Fixed-size circular buffer for aggregated payloads. /// -/// Entries are evicted FIFO when the buffer reaches capacity. -/// This prevents unbounded memory growth when finalization stalls. +/// Groups proofs by attestation data (via data_root). Each distinct +/// attestation message stores the full `AttestationData` plus all +/// `AggregatedSignatureProof`s covering that message. +/// +/// Entries are evicted FIFO (by insertion order of the data_root) +/// when the buffer reaches capacity. #[derive(Clone)] struct PayloadBuffer { - entries: VecDeque<(SignatureKey, StoredAggregatedPayload)>, + data: HashMap, + order: VecDeque, capacity: usize, } impl PayloadBuffer { fn new(capacity: usize) -> Self { Self { - entries: VecDeque::with_capacity(capacity), + data: HashMap::with_capacity(capacity), + order: VecDeque::with_capacity(capacity), capacity, } } - /// Insert one entry, FIFO-evicting the oldest if at capacity. - fn push(&mut self, key: SignatureKey, payload: StoredAggregatedPayload) { - if self.entries.len() >= self.capacity { - self.entries.pop_front(); + /// Insert proofs for an attestation, FIFO-evicting oldest data_root if at capacity. + fn push( + &mut self, + data_root: H256, + att_data: AttestationData, + proof: AggregatedSignatureProof, + ) { + if let Some(entry) = self.data.get_mut(&data_root) { + entry.proofs.push(proof); + } else { + // New data_root — check capacity before inserting + if self.order.len() >= self.capacity + && let Some(evicted) = self.order.pop_front() + { + self.data.remove(&evicted); + } + self.data.insert( + data_root, + PayloadEntry { + data: att_data, + proofs: vec![proof], + }, + ); + self.order.push_back(data_root); } - self.entries.push_back((key, payload)); } - /// Insert multiple entries, FIFO-evicting as needed. - fn push_batch(&mut self, entries: Vec<(SignatureKey, StoredAggregatedPayload)>) { - for (key, payload) in entries { - self.push(key, payload); + /// Insert a batch of (data_root, attestation_data, proof) entries. + fn push_batch(&mut self, entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>) { + for (data_root, att_data, proof) in entries { + self.push(data_root, att_data, proof); } } /// Take all entries, leaving the buffer empty. - fn drain(&mut self) -> Vec<(SignatureKey, StoredAggregatedPayload)> { - self.entries.drain(..).collect() - } - - /// Group entries by key, preserving insertion order within each group. - fn grouped(&self) -> HashMap> { - let mut map: HashMap> = HashMap::new(); - for (key, payload) in &self.entries { - map.entry(*key).or_default().push(payload.clone()); - } - map + fn drain(&mut self) -> Vec<(H256, AttestationData, AggregatedSignatureProof)> { + self.order.clear(); + self.data + .drain() + .flat_map(|(root, entry)| { + let data = entry.data; + entry + .proofs + .into_iter() + .map(move |proof| (root, data.clone(), proof)) + }) + .collect() } - /// Return deduplicated keys. - fn unique_keys(&self) -> HashSet { - self.entries.iter().map(|(key, _)| *key).collect() + /// Return the number of distinct attestation messages in the buffer. + fn len(&self) -> usize { + self.data.len() } - /// Return the number of entries in the buffer. - fn len(&self) -> usize { - self.entries.len() + /// Extract per-validator latest attestations from proofs' participation bits. + fn extract_latest_attestations(&self) -> HashMap { + let mut result: HashMap = HashMap::new(); + for entry in self.data.values() { + for proof in &entry.proofs { + for vid in proof.participant_indices() { + let should_update = result + .get(&vid) + .is_none_or(|existing| existing.slot < entry.data.slot); + if should_update { + result.insert(vid, entry.data.clone()); + } + } + } + } + result } } -// ============ Key Encoding Helpers ============ - -/// Encode a SignatureKey (validator_id, root) to bytes. -/// Layout: validator_id (8 bytes SSZ) || root (32 bytes SSZ) -fn encode_signature_key(key: &SignatureKey) -> Vec { - let mut result = key.0.as_ssz_bytes(); - result.extend(key.1.as_ssz_bytes()); - result +/// Individual validator signature received via gossip. +#[derive(Clone)] +struct GossipSignatureEntry { + validator_id: u64, + signature: ValidatorSignature, } -/// Decode a SignatureKey from bytes. -fn decode_signature_key(bytes: &[u8]) -> SignatureKey { - let validator_id = u64::from_ssz_bytes(&bytes[..8]).expect("valid validator_id"); - let root = H256::from_ssz_bytes(&bytes[8..]).expect("valid root"); - (validator_id, root) +/// Gossip signatures grouped by attestation data. +struct GossipDataEntry { + data: AttestationData, + signatures: Vec, } +/// Gossip signatures grouped by attestation data (via data_root). +type GossipSignatureMap = HashMap; + +/// Gossip signatures snapshot: (data_root, attestation_data, Vec<(validator_id, signature)>). +pub type GossipSignatureSnapshot = Vec<(H256, AttestationData, Vec<(u64, ValidatorSignature)>)>; + /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) /// Big-endian ensures lexicographic ordering matches numeric ordering. @@ -212,7 +250,9 @@ pub struct Store { backend: Arc, new_payloads: Arc>, known_payloads: Arc>, - gossip_signatures_count: Arc, + /// In-memory gossip signatures: data_root → (AttestationData, Vec). + /// Transient data consumed at interval 2 aggregation. + gossip_signatures: Arc>, } impl Store { @@ -348,28 +388,14 @@ impl Store { info!(%anchor_state_root, %anchor_block_root, "Initialized store"); - let initial_gossip_count = Self::count_gossip_signatures(&*backend); Self { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(initial_gossip_count)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } - /// Count existing gossip signatures in the database. - /// - /// Used once at startup to seed the in-memory counter. - fn count_gossip_signatures(backend: &dyn StorageBackend) -> usize { - backend - .begin_read() - .expect("read view") - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iterator") - .filter_map(|r| r.ok()) - .count() - } - // ============ Metadata Helpers ============ fn get_metadata(&self, key: &[u8]) -> T { @@ -479,12 +505,11 @@ impl Store { { let pruned_chain = self.prune_live_chain(finalized.slot); let pruned_sigs = self.prune_gossip_signatures(finalized.slot); - let pruned_att_data = self.prune_attestation_data_by_root(finalized.slot); - if pruned_chain > 0 || pruned_sigs > 0 || pruned_att_data > 0 { + if pruned_chain > 0 || pruned_sigs > 0 { info!( finalized_slot = finalized.slot, - pruned_chain, pruned_sigs, pruned_att_data, "Pruned finalized data" + pruned_chain, pruned_sigs, "Pruned finalized data" ); } } @@ -577,26 +602,12 @@ impl Store { /// Prune gossip signatures for slots <= finalized_slot. /// - /// Returns the number of signatures pruned. - pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { - let pruned = self.prune_by_slot(Table::GossipSignatures, finalized_slot, |bytes| { - StoredSignature::from_ssz_bytes(bytes).ok().map(|s| s.slot) - }); - self.gossip_signatures_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - Some(current.saturating_sub(pruned)) - }) - .unwrap(); - pruned - } - - /// Prune attestation data by root for slots <= finalized_slot. - /// /// Returns the number of entries pruned. - pub fn prune_attestation_data_by_root(&mut self, finalized_slot: u64) -> usize { - self.prune_by_slot(Table::AttestationDataByRoot, finalized_slot, |bytes| { - AttestationData::from_ssz_bytes(bytes).ok().map(|d| d.slot) - }) + pub fn prune_gossip_signatures(&mut self, finalized_slot: u64) -> usize { + let mut gossip = self.gossip_signatures.lock().unwrap(); + let before = gossip.len(); + gossip.retain(|_, entry| entry.data.slot > finalized_slot); + before - gossip.len() } /// Prune old states beyond the retention window. @@ -805,119 +816,80 @@ impl Store { batch.commit().expect("commit"); } - // ============ Attestation Data By Root ============ - // - // Content-addressed attestation data storage. Used to reconstruct - // per-validator attestation maps from aggregated payloads. - - /// Stores attestation data indexed by its tree hash root. - pub fn insert_attestation_data_by_root(&mut self, root: H256, data: AttestationData) { - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(root.as_ssz_bytes(), data.as_ssz_bytes())]; - batch - .put_batch(Table::AttestationDataByRoot, entries) - .expect("put attestation data"); - batch.commit().expect("commit"); - } + // ============ Attestation Extraction ============ - /// Batch-insert multiple attestation data entries in a single commit. - pub fn insert_attestation_data_by_root_batch(&mut self, entries: Vec<(H256, AttestationData)>) { - if entries.is_empty() { - return; - } - let mut batch = self.backend.begin_write().expect("write batch"); - let ssz_entries = entries - .into_iter() - .map(|(root, data)| (root.as_ssz_bytes(), data.as_ssz_bytes())) - .collect(); - batch - .put_batch(Table::AttestationDataByRoot, ssz_entries) - .expect("put attestation data batch"); - batch.commit().expect("commit"); + /// Extract per-validator latest attestations from known (fork-choice-active) payloads. + pub fn extract_latest_known_attestations(&self) -> HashMap { + self.known_payloads + .lock() + .unwrap() + .extract_latest_attestations() } - /// Returns attestation data for the given root hash. - pub fn get_attestation_data_by_root(&self, root: &H256) -> Option { - let view = self.backend.begin_read().expect("read view"); - view.get(Table::AttestationDataByRoot, &root.as_ssz_bytes()) - .expect("get") - .map(|bytes| AttestationData::from_ssz_bytes(&bytes).expect("valid attestation data")) + /// Extract per-validator latest attestations from new (pending) payloads. + pub fn extract_latest_new_attestations(&self) -> HashMap { + self.new_payloads + .lock() + .unwrap() + .extract_latest_attestations() } - /// Reconstruct per-validator attestation data from aggregated payloads. - /// - /// For each (validator_id, data_root) key in the payloads, looks up the - /// attestation data by root. Returns the latest attestation per validator - /// (by slot). - pub fn extract_latest_attestations( - &self, - keys: impl Iterator, - ) -> HashMap { - let mut result: HashMap = HashMap::new(); - let mut data_cache: HashMap> = HashMap::new(); - - for (validator_id, data_root) in keys { - let data = data_cache - .entry(data_root) - .or_insert_with(|| self.get_attestation_data_by_root(&data_root)); - - let Some(data) = data else { - continue; - }; - + /// Extract per-validator latest attestations from both known and new payloads. + pub fn extract_latest_all_attestations(&self) -> HashMap { + let mut result = self + .known_payloads + .lock() + .unwrap() + .extract_latest_attestations(); + for (vid, data) in self + .new_payloads + .lock() + .unwrap() + .extract_latest_attestations() + { let should_update = result - .get(&validator_id) + .get(&vid) .is_none_or(|existing| existing.slot < data.slot); - if should_update { - result.insert(validator_id, data.clone()); + result.insert(vid, data); } } - result } - /// Convenience: extract latest attestation per validator from known - /// (fork-choice-active) aggregated payloads only. - pub fn extract_latest_known_attestations(&self) -> HashMap { - let keys = self.known_payloads.lock().unwrap().unique_keys(); - self.extract_latest_attestations(keys.into_iter()) - } - // ============ Known Aggregated Payloads ============ // // "Known" aggregated payloads are active in fork choice weight calculations. // Promoted from "new" payloads at specific intervals (0 with proposal, 4). - /// Iterates over all known aggregated payloads, grouped by key. - pub fn iter_known_aggregated_payloads( + /// Returns a snapshot of known payloads as (AttestationData, Vec) pairs. + pub fn known_aggregated_payloads( &self, - ) -> impl Iterator)> { - self.known_payloads.lock().unwrap().grouped().into_iter() - } - - /// Iterates over deduplicated keys from the known aggregated payloads. - pub fn iter_known_aggregated_payload_keys(&self) -> impl Iterator { - self.known_payloads - .lock() - .unwrap() - .unique_keys() - .into_iter() + ) -> HashMap)> { + let buf = self.known_payloads.lock().unwrap(); + buf.data + .iter() + .map(|(root, entry)| (*root, (entry.data.clone(), entry.proofs.clone()))) + .collect() } - /// Insert an aggregated payload into the known (fork-choice-active) buffer. + /// Insert a single proof into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, - key: SignatureKey, - payload: StoredAggregatedPayload, + data_root: H256, + att_data: AttestationData, + proof: AggregatedSignatureProof, ) { - self.known_payloads.lock().unwrap().push(key, payload); + self.known_payloads + .lock() + .unwrap() + .push(data_root, att_data, proof); } - /// Batch-insert multiple aggregated payloads into the known buffer. + /// Batch-insert proofs into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>, ) { self.known_payloads.lock().unwrap().push_batch(entries); } @@ -927,70 +899,29 @@ impl Store { // "New" aggregated payloads are pending — not yet counted in fork choice. // Promoted to "known" via `promote_new_aggregated_payloads`. - /// Iterates over all new (pending) aggregated payloads, grouped by key. - pub fn iter_new_aggregated_payloads( - &self, - ) -> impl Iterator)> { - self.new_payloads.lock().unwrap().grouped().into_iter() - } - - /// Iterates over deduplicated keys from the new aggregated payloads. - pub fn iter_new_aggregated_payload_keys(&self) -> impl Iterator { - self.new_payloads.lock().unwrap().unique_keys().into_iter() - } - - /// Insert an aggregated payload into the new (pending) buffer. + /// Insert a single proof into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, - key: SignatureKey, - payload: StoredAggregatedPayload, + data_root: H256, + att_data: AttestationData, + proof: AggregatedSignatureProof, ) { - self.new_payloads.lock().unwrap().push(key, payload); + self.new_payloads + .lock() + .unwrap() + .push(data_root, att_data, proof); } - /// Batch-insert multiple aggregated payloads into the new buffer. + /// Batch-insert proofs into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, - entries: Vec<(SignatureKey, StoredAggregatedPayload)>, + entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>, ) { self.new_payloads.lock().unwrap().push_batch(entries); } // ============ Pruning Helpers ============ - /// Prune entries from a table where the slot (extracted via `get_slot`) is <= `finalized_slot`. - /// Returns the number of entries pruned. - fn prune_by_slot( - &mut self, - table: Table, - finalized_slot: u64, - get_slot: impl Fn(&[u8]) -> Option, - ) -> usize { - let view = self.backend.begin_read().expect("read view"); - let mut to_delete = vec![]; - - for (key_bytes, value_bytes) in view - .prefix_iterator(table, &[]) - .expect("iter") - .filter_map(|r| r.ok()) - { - if let Some(slot) = get_slot(&value_bytes) - && slot <= finalized_slot - { - to_delete.push(key_bytes.to_vec()); - } - } - drop(view); - - let count = to_delete.len(); - if !to_delete.is_empty() { - let mut batch = self.backend.begin_write().expect("write batch"); - batch.delete_batch(table, to_delete).expect("delete"); - batch.commit().expect("commit"); - } - count - } - /// Promotes all new aggregated payloads to known, making them active in fork choice. /// /// Drains the new buffer and pushes all entries into the known buffer. @@ -1009,9 +940,10 @@ impl Store { self.known_payloads.lock().unwrap().len() } - /// Returns the number of gossip signatures stored. + /// Returns the number of gossip signature entries stored. pub fn gossip_signatures_count(&self) -> usize { - self.gossip_signatures_count.load(Ordering::Relaxed) + let gossip = self.gossip_signatures.lock().unwrap(); + gossip.values().map(|entry| entry.signatures.len()).sum() } /// Estimated live data size in bytes for a table, as reported by the backend. @@ -1019,79 +951,68 @@ impl Store { self.backend.estimate_table_bytes(table) } - /// Delete specific gossip signatures by key. - pub fn delete_gossip_signatures(&mut self, keys: &[SignatureKey]) { + // ============ Gossip Signatures ============ + // + // Gossip signatures are individual validator signatures received via P2P. + // They're transient (consumed at interval 2 aggregation) so stored in-memory. + // Keyed by AttestationData (via data_root) matching the leanSpec structure: + // gossip_signatures: dict[AttestationData, set[GossipSignatureEntry]] + + /// Delete gossip entries for the given (validator_id, data_root) pairs. + pub fn delete_gossip_signatures(&mut self, keys: &[(u64, H256)]) { if keys.is_empty() { return; } - let count = keys.len(); - let encoded_keys: Vec<_> = keys.iter().map(encode_signature_key).collect(); - let mut batch = self.backend.begin_write().expect("write batch"); - batch - .delete_batch(Table::GossipSignatures, encoded_keys) - .expect("delete gossip signatures"); - batch.commit().expect("commit"); - self.gossip_signatures_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| { - Some(current.saturating_sub(count)) - }) - .unwrap(); + let mut gossip = self.gossip_signatures.lock().unwrap(); + for &(vid, data_root) in keys { + if let Some(entry) = gossip.get_mut(&data_root) { + entry.signatures.retain(|e| e.validator_id != vid); + if entry.signatures.is_empty() { + gossip.remove(&data_root); + } + } + } } - // ============ Gossip Signatures ============ - // - // Gossip signatures are individual validator signatures received via P2P. - // They're aggregated into proofs for block signature verification. - - /// Iterates over all gossip signatures. - pub fn iter_gossip_signatures( - &self, - ) -> impl Iterator + '_ { - let view = self.backend.begin_read().expect("read view"); - let entries: Vec<_> = view - .prefix_iterator(Table::GossipSignatures, &[]) - .expect("iterator") - .filter_map(|res| res.ok()) - .filter_map(|(k, v)| { - let key = decode_signature_key(&k); - StoredSignature::from_ssz_bytes(&v) - .ok() - .map(|stored| (key, stored)) + /// Returns a snapshot of gossip signatures grouped by attestation data. + pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { + let gossip = self.gossip_signatures.lock().unwrap(); + gossip + .iter() + .map(|(root, entry)| { + let sigs: Vec<_> = entry + .signatures + .iter() + .map(|e| (e.validator_id, e.signature.clone())) + .collect(); + (*root, entry.data.clone(), sigs) }) - .collect(); - entries.into_iter() + .collect() } /// Stores a gossip signature for later aggregation. pub fn insert_gossip_signature( &mut self, data_root: H256, - slot: u64, + att_data: AttestationData, validator_id: u64, signature: ValidatorSignature, ) { - let key = (validator_id, data_root); - let encoded_key = encode_signature_key(&key); - - // Check if key already exists to avoid inflating the counter on upsert - let is_new = self - .backend - .begin_read() - .expect("read view") - .get(Table::GossipSignatures, &encoded_key) - .expect("get") - .is_none(); - - let stored = StoredSignature::new(slot, signature); - let mut batch = self.backend.begin_write().expect("write batch"); - let entries = vec![(encoded_key, stored.as_ssz_bytes())]; - batch - .put_batch(Table::GossipSignatures, entries) - .expect("put signature"); - batch.commit().expect("commit"); - - if is_new { - self.gossip_signatures_count.fetch_add(1, Ordering::Relaxed); + let mut gossip = self.gossip_signatures.lock().unwrap(); + let entry = gossip.entry(data_root).or_insert_with(|| GossipDataEntry { + data: att_data, + signatures: Vec::new(), + }); + // Avoid duplicates for same validator + if !entry + .signatures + .iter() + .any(|e| e.validator_id == validator_id) + { + entry.signatures.push(GossipSignatureEntry { + validator_id, + signature, + }); } } @@ -1237,7 +1158,7 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } @@ -1248,7 +1169,7 @@ mod tests { backend, new_payloads: Arc::new(Mutex::new(PayloadBuffer::new(NEW_PAYLOAD_CAP))), known_payloads: Arc::new(Mutex::new(PayloadBuffer::new(AGGREGATED_PAYLOAD_CAP))), - gossip_signatures_count: Arc::new(AtomicUsize::new(0)), + gossip_signatures: Arc::new(Mutex::new(HashMap::new())), } } } @@ -1543,96 +1464,114 @@ mod tests { // ============ PayloadBuffer Tests ============ - fn make_payload(slot: u64) -> StoredAggregatedPayload { + fn make_proof() -> AggregatedSignatureProof { use ethlambda_types::attestation::AggregationBits; - use ethlambda_types::block::AggregatedSignatureProof; + AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()) + } - StoredAggregatedPayload { + fn make_att_data(slot: u64) -> AttestationData { + AttestationData { slot, - proof: AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()), + head: Checkpoint::default(), + target: Checkpoint::default(), + source: Checkpoint::default(), } } #[test] fn payload_buffer_fifo_eviction() { let mut buf = PayloadBuffer::new(3); - let key = (0u64, H256::ZERO); - buf.push(key, make_payload(1)); - buf.push(key, make_payload(2)); - buf.push(key, make_payload(3)); - assert_eq!(buf.entries.len(), 3); + // Insert 3 distinct attestation data entries (different slots → different roots) + for slot in 1..=3u64 { + let data = make_att_data(slot); + let data_root = data.tree_hash_root(); + buf.push(data_root, data, make_proof()); + } + assert_eq!(buf.len(), 3); + + // Pushing a 4th should evict the oldest (slot 1) + let data = make_att_data(4); + let data_root = data.tree_hash_root(); + buf.push(data_root, data, make_proof()); + assert_eq!(buf.len(), 3); - // Pushing a 4th entry should evict the oldest (slot 1) - buf.push(key, make_payload(4)); - assert_eq!(buf.entries.len(), 3); - let slots: Vec = buf.entries.iter().map(|(_, p)| p.slot).collect(); - assert_eq!(slots, vec![2, 3, 4]); + // The oldest (slot 1) should be gone + let att_data_1 = make_att_data(1); + assert!(!buf.data.contains_key(&att_data_1.tree_hash_root())); } #[test] - fn payload_buffer_grouped_returns_correct_groups() { + fn payload_buffer_multiple_proofs_per_data() { let mut buf = PayloadBuffer::new(10); - let key_a = (0u64, H256::ZERO); - let key_b = (1u64, H256::ZERO); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - buf.push(key_a, make_payload(1)); - buf.push(key_b, make_payload(2)); - buf.push(key_a, make_payload(3)); + // Insert 3 proofs for the same attestation data + buf.push(data_root, data.clone(), make_proof()); + buf.push(data_root, data.clone(), make_proof()); + buf.push(data_root, data, make_proof()); - let grouped = buf.grouped(); - assert_eq!(grouped.len(), 2); - assert_eq!(grouped[&key_a].len(), 2); - assert_eq!(grouped[&key_a][0].slot, 1); - assert_eq!(grouped[&key_a][1].slot, 3); - assert_eq!(grouped[&key_b].len(), 1); - assert_eq!(grouped[&key_b][0].slot, 2); + // Should be 1 distinct data entry with 3 proofs + assert_eq!(buf.len(), 1); + assert_eq!(buf.data[&data_root].proofs.len(), 3); } #[test] fn payload_buffer_drain_empties_buffer() { let mut buf = PayloadBuffer::new(10); - let key = (0u64, H256::ZERO); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - buf.push(key, make_payload(1)); - buf.push(key, make_payload(2)); + buf.push(data_root, data.clone(), make_proof()); + buf.push(data_root, data, make_proof()); let drained = buf.drain(); - assert_eq!(drained.len(), 2); - assert!(buf.entries.is_empty()); + assert_eq!(drained.len(), 2); // 2 proofs flattened + assert!(buf.data.is_empty()); + assert!(buf.order.is_empty()); } #[test] fn promote_moves_new_to_known() { let mut store = Store::test_store(); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - let key = (0u64, H256::ZERO); - store.insert_new_aggregated_payload(key, make_payload(1)); - store.insert_new_aggregated_payload(key, make_payload(2)); + store.insert_new_aggregated_payload(data_root, data.clone(), make_proof()); + store.insert_new_aggregated_payload(data_root, data, make_proof()); - assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 2); - assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 0); + assert_eq!(store.new_payloads.lock().unwrap().len(), 1); + assert_eq!(store.known_payloads.lock().unwrap().len(), 0); store.promote_new_aggregated_payloads(); - assert_eq!(store.new_payloads.lock().unwrap().entries.len(), 0); - assert_eq!(store.known_payloads.lock().unwrap().entries.len(), 2); + assert_eq!(store.new_payloads.lock().unwrap().len(), 0); + assert_eq!(store.known_payloads.lock().unwrap().len(), 1); + // The known buffer should have 2 proofs for this data + assert_eq!( + store.known_payloads.lock().unwrap().data[&data_root] + .proofs + .len(), + 2 + ); } #[test] fn cloned_store_shares_payload_buffers() { let mut store = Store::test_store(); let cloned = store.clone(); + let data = make_att_data(1); + let data_root = data.tree_hash_root(); - let key = (0u64, H256::ZERO); - store.insert_new_aggregated_payload(key, make_payload(1)); + store.insert_new_aggregated_payload(data_root, data, make_proof()); // Modification on original should be visible in clone - assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 1); + assert_eq!(cloned.new_payloads.lock().unwrap().len(), 1); store.promote_new_aggregated_payloads(); - assert_eq!(cloned.new_payloads.lock().unwrap().entries.len(), 0); - assert_eq!(cloned.known_payloads.lock().unwrap().entries.len(), 1); + assert_eq!(cloned.new_payloads.lock().unwrap().len(), 0); + assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } } diff --git a/crates/storage/src/types.rs b/crates/storage/src/types.rs deleted file mode 100644 index b23207f7..00000000 --- a/crates/storage/src/types.rs +++ /dev/null @@ -1,36 +0,0 @@ -use ethlambda_types::{ - block::AggregatedSignatureProof, primitives::ssz, signature::ValidatorSignature, -}; - -/// Gossip signature stored with slot for pruning. -/// -/// Signatures are stored alongside the slot they pertain to, enabling -/// simple slot-based pruning when blocks become finalized. -#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] -pub struct StoredSignature { - pub slot: u64, - pub signature_bytes: Vec, -} - -impl StoredSignature { - pub fn new(slot: u64, signature: ValidatorSignature) -> Self { - Self { - slot, - signature_bytes: signature.to_bytes(), - } - } - - pub fn to_validator_signature(&self) -> Result { - ValidatorSignature::from_bytes(&self.signature_bytes) - } -} - -/// Aggregated payload stored with slot for pruning. -/// -/// Aggregated signature proofs are stored with their slot to enable -/// pruning when blocks become finalized. -#[derive(Debug, Clone, ssz::Encode, ssz::Decode)] -pub struct StoredAggregatedPayload { - pub slot: u64, - pub proof: AggregatedSignatureProof, -} From 628fb9a2c4e997ee813886f1f9dcddbc597c6e35 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Mon, 30 Mar 2026 16:54:24 -0300 Subject: [PATCH 2/5] Handle deprecated RocksDB column families on upgrade RocksDB requires all existing column families to be listed when opening a database. Since this branch removed GossipSignatures and AttestationDataByRoot tables, nodes upgrading from a previous version would crash on startup with "You have to open all column families". Open with deprecated CFs included in descriptors, then drop them immediately after opening. --- crates/storage/src/backend/rocksdb.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 4853873e..6859fc1e 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -27,6 +27,11 @@ pub struct RocksDBBackend { db: Arc>, } +/// Column families from previous versions that are no longer used. +/// RocksDB requires all existing CFs to be listed when opening a database, +/// so we include these to allow seamless upgrades from older versions. +const DEPRECATED_CF_NAMES: &[&str] = &["gossip_signatures", "attestation_data_by_root"]; + impl RocksDBBackend { /// Open a RocksDB database at the given path. pub fn open(path: impl AsRef) -> Result { @@ -34,14 +39,24 @@ impl RocksDBBackend { opts.create_if_missing(true); opts.create_missing_column_families(true); - let cf_descriptors: Vec<_> = ALL_TABLES + let mut cf_descriptors: Vec<_> = ALL_TABLES .iter() .map(|t| ColumnFamilyDescriptor::new(cf_name(*t), Options::default())) .collect(); + // Include deprecated CFs so RocksDB can open databases from older versions. + for name in DEPRECATED_CF_NAMES { + cf_descriptors.push(ColumnFamilyDescriptor::new(*name, Options::default())); + } + let db = DBWithThreadMode::::open_cf_descriptors(&opts, path, cf_descriptors)?; + // Drop deprecated column families (idempotent — ignored if already absent). + for name in DEPRECATED_CF_NAMES { + let _ = db.drop_cf(name); + } + Ok(Self { db: Arc::new(db) }) } } From dde3ec60a00c53d6c4395a5cf0efc66289f751aa Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 31 Mar 2026 17:32:24 -0300 Subject: [PATCH 3/5] Introduce HashedAttestationData to eliminate data_root invariant Replace separate (H256, AttestationData) parameters with a single HashedAttestationData struct that bundles both and computes the root in its only constructor. This makes it impossible to pass a mismatched data_root at compile time, addressing the concern from PR #253. --- crates/blockchain/src/store.rs | 51 +++++++--------- crates/common/types/src/attestation.rs | 43 ++++++++++++- crates/storage/src/store.rs | 83 ++++++++++---------------- 3 files changed, 95 insertions(+), 82 deletions(-) diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index d0c4f370..b2961633 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -9,7 +9,7 @@ use ethlambda_types::{ ShortRoot, attestation::{ AggregatedAttestation, AggregationBits, Attestation, AttestationData, - SignedAggregatedAttestation, SignedAttestation, validator_indices, + HashedAttestationData, SignedAggregatedAttestation, SignedAttestation, validator_indices, }, block::{ AggregatedAttestations, AggregatedSignatureProof, Block, BlockBody, @@ -128,10 +128,11 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec = Vec::new(); - let mut payload_entries: Vec<(H256, AttestationData, AggregatedSignatureProof)> = Vec::new(); + let mut payload_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); - for (data_root, data, validator_sigs) in &gossip_groups { - let slot = data.slot; + for (hashed, validator_sigs) in &gossip_groups { + let data_root = hashed.root(); + let slot = hashed.data().slot; let mut sigs = vec![]; let mut pubkeys = vec![]; @@ -156,7 +157,7 @@ fn aggregate_committee_signatures(store: &mut Store) -> Vec Vec>()?; - let data_root = aggregated.data.tree_hash_root(); + let hashed = HashedAttestationData::new(aggregated.data.clone()); + let data_root = hashed.root(); let slot: u32 = aggregated.data.slot.try_into().expect("slot exceeds u32"); { @@ -443,11 +446,7 @@ pub fn on_gossip_aggregated_attestation( .map_err(StoreError::AggregateVerificationFailed)?; // Store one proof per attestation data (not per validator) - store.insert_new_aggregated_payload( - data_root, - aggregated.data.clone(), - aggregated.proof.clone(), - ); + store.insert_new_aggregated_payload(hashed, aggregated.proof.clone()); metrics::update_latest_new_aggregated_payloads(store.new_aggregated_payloads_count()); let slot = aggregated.data.slot; @@ -559,13 +558,12 @@ fn on_block_core( let attestation_signatures = &signed_block.signature.attestation_signatures; // Store one proof per attestation data in known aggregated payloads. - let mut known_entries: Vec<(H256, AttestationData, AggregatedSignatureProof)> = Vec::new(); + let mut known_entries: Vec<(HashedAttestationData, AggregatedSignatureProof)> = Vec::new(); for (att, proof) in aggregated_attestations .iter() .zip(attestation_signatures.iter()) { - let data_root = att.data.tree_hash_root(); - known_entries.push((data_root, att.data.clone(), proof.clone())); + known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); // Count each participating validator as a valid attestation for _ in validator_indices(&att.aggregation_bits) { metrics::inc_attestations_valid(); @@ -575,7 +573,7 @@ fn on_block_core( // Process proposer attestation as pending (enters "new" stage via gossip path) // The proposer's attestation should NOT affect this block's fork choice position. let proposer_vid = proposer_attestation.validator_id; - let proposer_data_root = proposer_attestation.data.tree_hash_root(); + let proposer_hashed = HashedAttestationData::new(proposer_attestation.data.clone()); store.insert_known_aggregated_payloads_batch(known_entries); @@ -588,23 +586,14 @@ fn on_block_core( // Without sig verification, insert directly with a dummy proof let participants = aggregation_bits_from_validator_indices(&[proposer_vid]); let proof = AggregatedSignatureProof::empty(participants); - store.insert_new_aggregated_payload( - proposer_data_root, - proposer_attestation.data.clone(), - proof, - ); + store.insert_new_aggregated_payload(proposer_hashed, proof); } else { // Store the proposer's signature unconditionally for future block building. // Subnet filtering is handled at the P2P subscription layer. let proposer_sig = ValidatorSignature::from_bytes(&signed_block.signature.proposer_signature) .map_err(|_| StoreError::SignatureDecodingFailed)?; - store.insert_gossip_signature( - proposer_data_root, - proposer_attestation.data.clone(), - proposer_vid, - proposer_sig, - ); + store.insert_gossip_signature(proposer_hashed, proposer_vid, proposer_sig); } let block_total = block_start.elapsed(); diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 15c4331d..2df26e72 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -1,7 +1,10 @@ use crate::{ block::AggregatedSignatureProof, checkpoint::Checkpoint, - primitives::ssz::{Decode, Encode, TreeHash}, + primitives::{ + H256, + ssz::{Decode, Encode, TreeHash}, + }, signature::SignatureSize, state::ValidatorRegistryLimit, }; @@ -77,3 +80,41 @@ pub struct SignedAggregatedAttestation { pub data: AttestationData, pub proof: AggregatedSignatureProof, } + +/// Attestation data paired with its precomputed tree hash root. +/// +/// Private fields ensure that `root == data.tree_hash_root()` is always true. +/// The only way to construct this is via [`HashedAttestationData::new`] or +/// [`From`], both of which compute the root from the data. +#[derive(Debug, Clone)] +pub struct HashedAttestationData { + root: H256, + data: AttestationData, +} + +impl HashedAttestationData { + pub fn new(data: AttestationData) -> Self { + Self { + root: data.tree_hash_root(), + data, + } + } + + pub fn root(&self) -> H256 { + self.root + } + + pub fn data(&self) -> &AttestationData { + &self.data + } + + pub fn into_parts(self) -> (H256, AttestationData) { + (self.root, self.data) + } +} + +impl From for HashedAttestationData { + fn from(data: AttestationData) -> Self { + Self::new(data) + } +} diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 5d2dc5a1..63b0dc2f 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -10,7 +10,7 @@ static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().t use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::AttestationData, + attestation::{AttestationData, HashedAttestationData}, block::{ AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignaturesWithAttestation, BlockWithAttestation, SignedBlockWithAttestation, @@ -123,12 +123,8 @@ impl PayloadBuffer { } /// Insert proofs for an attestation, FIFO-evicting oldest data_root if at capacity. - fn push( - &mut self, - data_root: H256, - att_data: AttestationData, - proof: AggregatedSignatureProof, - ) { + fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { + let (data_root, att_data) = hashed.into_parts(); if let Some(entry) = self.data.get_mut(&data_root) { entry.proofs.push(proof); } else { @@ -149,24 +145,23 @@ impl PayloadBuffer { } } - /// Insert a batch of (data_root, attestation_data, proof) entries. - fn push_batch(&mut self, entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>) { - for (data_root, att_data, proof) in entries { - self.push(data_root, att_data, proof); + /// Insert a batch of (hashed_attestation_data, proof) entries. + fn push_batch(&mut self, entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>) { + for (hashed, proof) in entries { + self.push(hashed, proof); } } /// Take all entries, leaving the buffer empty. - fn drain(&mut self) -> Vec<(H256, AttestationData, AggregatedSignatureProof)> { + fn drain(&mut self) -> Vec<(HashedAttestationData, AggregatedSignatureProof)> { self.order.clear(); self.data .drain() - .flat_map(|(root, entry)| { - let data = entry.data; + .flat_map(|(_, entry)| { entry .proofs .into_iter() - .map(move |proof| (root, data.clone(), proof)) + .map(move |proof| (HashedAttestationData::new(entry.data.clone()), proof)) }) .collect() } @@ -211,8 +206,8 @@ struct GossipDataEntry { /// Gossip signatures grouped by attestation data (via data_root). type GossipSignatureMap = HashMap; -/// Gossip signatures snapshot: (data_root, attestation_data, Vec<(validator_id, signature)>). -pub type GossipSignatureSnapshot = Vec<(H256, AttestationData, Vec<(u64, ValidatorSignature)>)>; +/// Gossip signatures snapshot: (hashed_attestation_data, Vec<(validator_id, signature)>). +pub type GossipSignatureSnapshot = Vec<(HashedAttestationData, Vec<(u64, ValidatorSignature)>)>; /// Encode a LiveChain key (slot, root) to bytes. /// Layout: slot (8 bytes big-endian) || root (32 bytes) @@ -876,20 +871,16 @@ impl Store { /// Insert a single proof into the known (fork-choice-active) buffer. pub fn insert_known_aggregated_payload( &mut self, - data_root: H256, - att_data: AttestationData, + hashed: HashedAttestationData, proof: AggregatedSignatureProof, ) { - self.known_payloads - .lock() - .unwrap() - .push(data_root, att_data, proof); + self.known_payloads.lock().unwrap().push(hashed, proof); } /// Batch-insert proofs into the known buffer. pub fn insert_known_aggregated_payloads_batch( &mut self, - entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>, + entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, ) { self.known_payloads.lock().unwrap().push_batch(entries); } @@ -902,20 +893,16 @@ impl Store { /// Insert a single proof into the new (pending) buffer. pub fn insert_new_aggregated_payload( &mut self, - data_root: H256, - att_data: AttestationData, + hashed: HashedAttestationData, proof: AggregatedSignatureProof, ) { - self.new_payloads - .lock() - .unwrap() - .push(data_root, att_data, proof); + self.new_payloads.lock().unwrap().push(hashed, proof); } /// Batch-insert proofs into the new buffer. pub fn insert_new_aggregated_payloads_batch( &mut self, - entries: Vec<(H256, AttestationData, AggregatedSignatureProof)>, + entries: Vec<(HashedAttestationData, AggregatedSignatureProof)>, ) { self.new_payloads.lock().unwrap().push_batch(entries); } @@ -978,14 +965,14 @@ impl Store { pub fn iter_gossip_signatures(&self) -> GossipSignatureSnapshot { let gossip = self.gossip_signatures.lock().unwrap(); gossip - .iter() - .map(|(root, entry)| { + .values() + .map(|entry| { let sigs: Vec<_> = entry .signatures .iter() .map(|e| (e.validator_id, e.signature.clone())) .collect(); - (*root, entry.data.clone(), sigs) + (HashedAttestationData::new(entry.data.clone()), sigs) }) .collect() } @@ -993,11 +980,11 @@ impl Store { /// Stores a gossip signature for later aggregation. pub fn insert_gossip_signature( &mut self, - data_root: H256, - att_data: AttestationData, + hashed: HashedAttestationData, validator_id: u64, signature: ValidatorSignature, ) { + let (data_root, att_data) = hashed.into_parts(); let mut gossip = self.gossip_signatures.lock().unwrap(); let entry = gossip.entry(data_root).or_insert_with(|| GossipDataEntry { data: att_data, @@ -1485,15 +1472,13 @@ mod tests { // Insert 3 distinct attestation data entries (different slots → different roots) for slot in 1..=3u64 { let data = make_att_data(slot); - let data_root = data.tree_hash_root(); - buf.push(data_root, data, make_proof()); + buf.push(HashedAttestationData::new(data), make_proof()); } assert_eq!(buf.len(), 3); // Pushing a 4th should evict the oldest (slot 1) let data = make_att_data(4); - let data_root = data.tree_hash_root(); - buf.push(data_root, data, make_proof()); + buf.push(HashedAttestationData::new(data), make_proof()); assert_eq!(buf.len(), 3); // The oldest (slot 1) should be gone @@ -1508,9 +1493,9 @@ mod tests { let data_root = data.tree_hash_root(); // Insert 3 proofs for the same attestation data - buf.push(data_root, data.clone(), make_proof()); - buf.push(data_root, data.clone(), make_proof()); - buf.push(data_root, data, make_proof()); + buf.push(HashedAttestationData::new(data.clone()), make_proof()); + buf.push(HashedAttestationData::new(data.clone()), make_proof()); + buf.push(HashedAttestationData::new(data), make_proof()); // Should be 1 distinct data entry with 3 proofs assert_eq!(buf.len(), 1); @@ -1521,10 +1506,9 @@ mod tests { fn payload_buffer_drain_empties_buffer() { let mut buf = PayloadBuffer::new(10); let data = make_att_data(1); - let data_root = data.tree_hash_root(); - buf.push(data_root, data.clone(), make_proof()); - buf.push(data_root, data, make_proof()); + buf.push(HashedAttestationData::new(data.clone()), make_proof()); + buf.push(HashedAttestationData::new(data), make_proof()); let drained = buf.drain(); assert_eq!(drained.len(), 2); // 2 proofs flattened @@ -1538,8 +1522,8 @@ mod tests { let data = make_att_data(1); let data_root = data.tree_hash_root(); - store.insert_new_aggregated_payload(data_root, data.clone(), make_proof()); - store.insert_new_aggregated_payload(data_root, data, make_proof()); + store.insert_new_aggregated_payload(HashedAttestationData::new(data.clone()), make_proof()); + store.insert_new_aggregated_payload(HashedAttestationData::new(data), make_proof()); assert_eq!(store.new_payloads.lock().unwrap().len(), 1); assert_eq!(store.known_payloads.lock().unwrap().len(), 0); @@ -1562,9 +1546,8 @@ mod tests { let mut store = Store::test_store(); let cloned = store.clone(); let data = make_att_data(1); - let data_root = data.tree_hash_root(); - store.insert_new_aggregated_payload(data_root, data, make_proof()); + store.insert_new_aggregated_payload(HashedAttestationData::new(data), make_proof()); // Modification on original should be visible in clone assert_eq!(cloned.new_payloads.lock().unwrap().len(), 1); From 9f5c8a68da9c5223db6efe41855278f9c34c98b1 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 31 Mar 2026 18:51:13 -0300 Subject: [PATCH 4/5] Address PR #253 review comments from MegaRedHand - Change metrics::inc_attestations_valid() to accept a count parameter and use inc_by(), replacing the per-validator loop in on_block_core - Remove deprecated RocksDB column family handling (DEPRECATED_CF_NAMES, include-on-open, drop-after-open); old databases require a resync - Add duplicate proof check in PayloadBuffer::push() by comparing participants bits before inserting - Change PayloadBuffer eviction from data_root count to total proof count, tracking total_proofs across push/drain operations --- crates/blockchain/src/metrics.rs | 4 +- crates/blockchain/src/store.rs | 9 ++-- crates/storage/src/backend/rocksdb.rs | 17 +----- crates/storage/src/store.rs | 74 ++++++++++++++++++++++----- 4 files changed, 67 insertions(+), 37 deletions(-) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index 520edbea..20092cd7 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -356,8 +356,8 @@ pub fn set_node_start_time() { } /// Increment the valid attestations counter. -pub fn inc_attestations_valid() { - LEAN_ATTESTATIONS_VALID_TOTAL.inc(); +pub fn inc_attestations_valid(count: u64) { + LEAN_ATTESTATIONS_VALID_TOTAL.inc_by(count); } /// Increment the invalid attestations counter. diff --git a/crates/blockchain/src/store.rs b/crates/blockchain/src/store.rs index b2961633..77582a1f 100644 --- a/crates/blockchain/src/store.rs +++ b/crates/blockchain/src/store.rs @@ -379,7 +379,7 @@ pub fn on_gossip_attestation( store.insert_gossip_signature(hashed, validator_id, signature); metrics::update_gossip_signatures(store.gossip_signatures_count()); - metrics::inc_attestations_valid(); + metrics::inc_attestations_valid(1); let slot = attestation.data.slot; let target_slot = attestation.data.target.slot; @@ -460,7 +460,7 @@ pub fn on_gossip_aggregated_attestation( "Aggregated attestation processed" ); - metrics::inc_attestations_valid(); + metrics::inc_attestations_valid(1); Ok(()) } @@ -565,9 +565,8 @@ fn on_block_core( { known_entries.push((HashedAttestationData::new(att.data.clone()), proof.clone())); // Count each participating validator as a valid attestation - for _ in validator_indices(&att.aggregation_bits) { - metrics::inc_attestations_valid(); - } + let count = validator_indices(&att.aggregation_bits).count() as u64; + metrics::inc_attestations_valid(count); } // Process proposer attestation as pending (enters "new" stage via gossip path) diff --git a/crates/storage/src/backend/rocksdb.rs b/crates/storage/src/backend/rocksdb.rs index 6859fc1e..4853873e 100644 --- a/crates/storage/src/backend/rocksdb.rs +++ b/crates/storage/src/backend/rocksdb.rs @@ -27,11 +27,6 @@ pub struct RocksDBBackend { db: Arc>, } -/// Column families from previous versions that are no longer used. -/// RocksDB requires all existing CFs to be listed when opening a database, -/// so we include these to allow seamless upgrades from older versions. -const DEPRECATED_CF_NAMES: &[&str] = &["gossip_signatures", "attestation_data_by_root"]; - impl RocksDBBackend { /// Open a RocksDB database at the given path. pub fn open(path: impl AsRef) -> Result { @@ -39,24 +34,14 @@ impl RocksDBBackend { opts.create_if_missing(true); opts.create_missing_column_families(true); - let mut cf_descriptors: Vec<_> = ALL_TABLES + let cf_descriptors: Vec<_> = ALL_TABLES .iter() .map(|t| ColumnFamilyDescriptor::new(cf_name(*t), Options::default())) .collect(); - // Include deprecated CFs so RocksDB can open databases from older versions. - for name in DEPRECATED_CF_NAMES { - cf_descriptors.push(ColumnFamilyDescriptor::new(*name, Options::default())); - } - let db = DBWithThreadMode::::open_cf_descriptors(&opts, path, cf_descriptors)?; - // Drop deprecated column families (idempotent — ignored if already absent). - for name in DEPRECATED_CF_NAMES { - let _ = db.drop_cf(name); - } - Ok(Self { db: Arc::new(db) }) } } diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 63b0dc2f..d21ccdec 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -111,6 +111,7 @@ struct PayloadBuffer { data: HashMap, order: VecDeque, capacity: usize, + total_proofs: usize, } impl PayloadBuffer { @@ -119,20 +120,34 @@ impl PayloadBuffer { data: HashMap::with_capacity(capacity), order: VecDeque::with_capacity(capacity), capacity, + total_proofs: 0, } } - /// Insert proofs for an attestation, FIFO-evicting oldest data_root if at capacity. + /// Insert proofs for an attestation, FIFO-evicting oldest data_roots when total proofs reach capacity. fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { let (data_root, att_data) = hashed.into_parts(); if let Some(entry) = self.data.get_mut(&data_root) { + // Skip duplicate proofs (same participants) + if entry + .proofs + .iter() + .any(|p| p.participants == proof.participants) + { + return; + } entry.proofs.push(proof); + self.total_proofs += 1; } else { - // New data_root — check capacity before inserting - if self.order.len() >= self.capacity - && let Some(evicted) = self.order.pop_front() - { - self.data.remove(&evicted); + // Evict oldest data_roots until under capacity + while self.total_proofs >= self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_proofs -= removed.proofs.len(); + } + } else { + break; + } } self.data.insert( data_root, @@ -142,6 +157,7 @@ impl PayloadBuffer { }, ); self.order.push_back(data_root); + self.total_proofs += 1; } } @@ -155,6 +171,7 @@ impl PayloadBuffer { /// Take all entries, leaving the buffer empty. fn drain(&mut self) -> Vec<(HashedAttestationData, AggregatedSignatureProof)> { self.order.clear(); + self.total_proofs = 0; self.data .drain() .flat_map(|(_, entry)| { @@ -1456,6 +1473,14 @@ mod tests { AggregatedSignatureProof::empty(AggregationBits::with_capacity(0).unwrap()) } + /// Create a proof with a specific validator bit set (distinct participants). + fn make_proof_for_validator(vid: usize) -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + let mut bits = AggregationBits::with_capacity(vid + 1).unwrap(); + bits.set(vid, true).unwrap(); + AggregatedSignatureProof::empty(bits) + } + fn make_att_data(slot: u64) -> AttestationData { AttestationData { slot, @@ -1492,10 +1517,19 @@ mod tests { let data = make_att_data(1); let data_root = data.tree_hash_root(); - // Insert 3 proofs for the same attestation data - buf.push(HashedAttestationData::new(data.clone()), make_proof()); - buf.push(HashedAttestationData::new(data.clone()), make_proof()); - buf.push(HashedAttestationData::new(data), make_proof()); + // Insert 3 proofs with distinct participants for the same attestation data + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(1), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(2), + ); // Should be 1 distinct data entry with 3 proofs assert_eq!(buf.len(), 1); @@ -1507,8 +1541,14 @@ mod tests { let mut buf = PayloadBuffer::new(10); let data = make_att_data(1); - buf.push(HashedAttestationData::new(data.clone()), make_proof()); - buf.push(HashedAttestationData::new(data), make_proof()); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validator(1), + ); let drained = buf.drain(); assert_eq!(drained.len(), 2); // 2 proofs flattened @@ -1522,8 +1562,14 @@ mod tests { let data = make_att_data(1); let data_root = data.tree_hash_root(); - store.insert_new_aggregated_payload(HashedAttestationData::new(data.clone()), make_proof()); - store.insert_new_aggregated_payload(HashedAttestationData::new(data), make_proof()); + store.insert_new_aggregated_payload( + HashedAttestationData::new(data.clone()), + make_proof_for_validator(0), + ); + store.insert_new_aggregated_payload( + HashedAttestationData::new(data), + make_proof_for_validator(1), + ); assert_eq!(store.new_payloads.lock().unwrap().len(), 1); assert_eq!(store.known_payloads.lock().unwrap().len(), 0); From fcde0b2eb4118ca71778b8284d421ae04452626d Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Wed, 1 Apr 2026 19:15:07 -0300 Subject: [PATCH 5/5] Move PayloadBuffer eviction after both insert branches Eviction only ran when inserting a new data_root, so proofs appended to an existing entry could bypass the capacity limit. Move the eviction loop after both branches so it always enforces the bound. --- crates/storage/src/store.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index d21ccdec..86bf7adc 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -139,16 +139,6 @@ impl PayloadBuffer { entry.proofs.push(proof); self.total_proofs += 1; } else { - // Evict oldest data_roots until under capacity - while self.total_proofs >= self.capacity { - if let Some(evicted) = self.order.pop_front() { - if let Some(removed) = self.data.remove(&evicted) { - self.total_proofs -= removed.proofs.len(); - } - } else { - break; - } - } self.data.insert( data_root, PayloadEntry { @@ -159,6 +149,16 @@ impl PayloadBuffer { self.order.push_back(data_root); self.total_proofs += 1; } + // Evict oldest data_roots until under capacity + while self.total_proofs > self.capacity { + if let Some(evicted) = self.order.pop_front() { + if let Some(removed) = self.data.remove(&evicted) { + self.total_proofs -= removed.proofs.len(); + } + } else { + break; + } + } } /// Insert a batch of (hashed_attestation_data, proof) entries.