From f1d40aa08ddacc04756437f99f60ddac8ece8ebd Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 30 Jan 2026 20:41:41 +0900 Subject: [PATCH 01/17] feat: add mempool integration with execution sync and error isolation --- crates/execution/src/engine.rs | 61 ++++++++- crates/execution/src/error.rs | 56 +++++++++ crates/node/Cargo.toml | 4 + crates/node/src/execution_bridge.rs | 2 +- crates/node/src/execution_sync.rs | 137 ++++++++++++++++++++ crates/node/src/lib.rs | 6 + crates/node/src/mempool_state.rs | 92 ++++++++++++++ crates/node/src/node.rs | 28 ++++- crates/node/src/worker_pool_adapter.rs | 168 +++++++++++++++++++++++++ 9 files changed, 550 insertions(+), 4 deletions(-) create mode 100644 crates/node/src/execution_sync.rs create mode 100644 crates/node/src/mempool_state.rs create mode 100644 crates/node/src/worker_pool_adapter.rs diff --git a/crates/execution/src/engine.rs b/crates/execution/src/engine.rs index b61bf4d..621830f 100644 --- a/crates/execution/src/engine.rs +++ b/crates/execution/src/engine.rs @@ -5,7 +5,7 @@ use crate::{ database::{CipherBftDatabase, Provider}, - error::{ExecutionError, Result}, + error::{ExecutionError, Result, TxErrorCategory}, evm::CipherBftEvmConfig, precompiles::{GenesisValidatorData, StakingPrecompile}, receipts::{ @@ -353,7 +353,26 @@ impl ExecutionEngine

{ let tx_start = Instant::now(); // Execute transaction - let tx_result = self.evm_config.execute_transaction(&mut evm, tx_bytes)?; + let tx_result = match self.evm_config.execute_transaction(&mut evm, tx_bytes) { + Ok(result) => result, + Err(e) => match e.category() { + TxErrorCategory::Skip { reason } => { + tracing::warn!( + tx_index, + ?reason, + "Skipping invalid transaction (mempool should catch this)" + ); + continue; + } + TxErrorCategory::FailedReceipt => { + tracing::warn!(tx_index, error = %e, "Transaction reverted, skipping"); + continue; + } + TxErrorCategory::Fatal => { + return Err(e); + } + }, + }; // Record per-transaction metrics let tx_duration = tx_start.elapsed(); @@ -947,4 +966,42 @@ mod tests { // Verify beneficiary is set in sealed block header assert_eq!(sealed.header.beneficiary, beneficiary); } + + #[test] + fn test_error_isolation_skips_invalid_transactions() { + // This test documents the expected behavior of error isolation. + // + // When a block contains transactions with invalid nonces (NonceTooHigh, NonceTooLow), + // insufficient balance, or other validation errors, the execution engine should: + // + // 1. Skip the invalid transaction (no receipt generated) + // 2. Continue processing remaining transactions in the block + // 3. Return success with results from valid transactions only + // + // This prevents a single bad transaction from failing an entire block, + // which would cause execution-consensus divergence. + // + // Implementation note: The actual error isolation is implemented in + // execute_block() which handles TxErrorCategory::Skip by continuing + // the transaction loop instead of returning an error. + // + // Testing this end-to-end requires creating signed transactions with + // specific nonces, which requires test infrastructure for: + // - Generating valid ECDSA signatures + // - Setting up account state with specific nonces + // - Creating transactions that will trigger NonceTooHigh + // + // For now, this test documents the expected behavior. + // Full integration testing is done via devnet MassMint scenarios. + + let engine = create_test_engine(); + + // Verify engine is created correctly + assert_eq!(engine.chain_config.chain_id, 85300); + + // The actual error isolation behavior is tested by: + // 1. Running the devnet with MassMint script + // 2. Verifying blocks execute even when some transactions have wrong nonces + // 3. Checking that valid transactions are included and executed + } } diff --git a/crates/execution/src/error.rs b/crates/execution/src/error.rs index 6a60d98..d35e673 100644 --- a/crates/execution/src/error.rs +++ b/crates/execution/src/error.rs @@ -143,3 +143,59 @@ impl DatabaseError { } impl DBErrorMarker for DatabaseError {} + +/// Categorizes transaction execution errors for handling decisions. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TxErrorCategory { + /// Skip transaction, continue block execution. + Skip { + /// The reason for skipping this transaction. + reason: SkipReason, + }, + /// Include transaction with failed receipt (EVM revert). + FailedReceipt, + /// Fatal error - halt block execution. + Fatal, +} + +/// Reason for skipping a transaction. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SkipReason { + /// Transaction nonce is higher than expected. + NonceTooHigh, + /// Transaction nonce is lower than expected (already executed). + NonceTooLow, + /// Account has insufficient balance for transaction. + InsufficientBalance, + /// Transaction failed basic validation. + InvalidTransaction, +} + +impl ExecutionError { + /// Categorize this error for handling decision. + pub fn category(&self) -> TxErrorCategory { + let error_str = format!("{:?}", self); + + if error_str.contains("NonceTooHigh") { + TxErrorCategory::Skip { + reason: SkipReason::NonceTooHigh, + } + } else if error_str.contains("NonceTooLow") { + TxErrorCategory::Skip { + reason: SkipReason::NonceTooLow, + } + } else if error_str.contains("InsufficientFunds") || error_str.contains("insufficient") { + TxErrorCategory::Skip { + reason: SkipReason::InsufficientBalance, + } + } else if matches!(self, ExecutionError::InvalidTransaction(_)) { + TxErrorCategory::Skip { + reason: SkipReason::InvalidTransaction, + } + } else if matches!(self, ExecutionError::Evm(_)) { + TxErrorCategory::FailedReceipt + } else { + TxErrorCategory::Fatal + } + } +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 23aaa50..336418b 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -26,11 +26,15 @@ cipherbft-metrics = { path = "../metrics" } alloy-primitives = { version = "1", features = ["serde"] } alloy-rlp = { workspace = true } alloy-consensus = { workspace = true } +alloy-eips = { version = "1" } # Reth primitives (for transaction parsing in block execution) reth-primitives = { workspace = true } reth-primitives-traits = { workspace = true } +# Reth transaction pool (for WorkerPoolAdapter) +reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth", tag = "v1.10.0" } + # Async runtime tokio = { workspace = true, features = ["full", "signal"] } tokio-util = { workspace = true } diff --git a/crates/node/src/execution_bridge.rs b/crates/node/src/execution_bridge.rs index 8092500..2dccc36 100644 --- a/crates/node/src/execution_bridge.rs +++ b/crates/node/src/execution_bridge.rs @@ -787,7 +787,7 @@ mod tests { #[tokio::test] async fn test_set_genesis_block_hash() { - let bridge = create_default_bridge().unwrap(); + let (bridge, _temp_dir) = create_default_bridge().unwrap(); // Initially should be B256::ZERO let initial_hash = bridge.last_block_hash.read().map(|guard| *guard).unwrap(); diff --git a/crates/node/src/execution_sync.rs b/crates/node/src/execution_sync.rs new file mode 100644 index 0000000..1143223 --- /dev/null +++ b/crates/node/src/execution_sync.rs @@ -0,0 +1,137 @@ +//! Execution-Consensus synchronization tracking. + +use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; +use tracing::error; + +/// Configuration for execution sync tracking. +/// +/// Note: Named `ExecutionSyncConfig` to avoid collision with +/// `cipherbft_consensus::ExecutionSyncConfig`. +#[derive(Clone, Debug)] +pub struct ExecutionSyncConfig { + /// Maximum blocks execution can fall behind before halting. + pub max_divergence: u64, + /// Maximum consecutive failures before halting. + pub max_consecutive_failures: u32, +} + +impl Default for ExecutionSyncConfig { + fn default() -> Self { + Self { + max_divergence: 10, + max_consecutive_failures: 5, + } + } +} + +/// Action to take after execution failure. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SyncAction { + Continue, + Halt { reason: String }, +} + +/// Tracks execution progress relative to consensus. +pub struct ExecutionSyncTracker { + last_executed: AtomicU64, + consecutive_failures: AtomicU32, + config: ExecutionSyncConfig, +} + +impl ExecutionSyncTracker { + pub fn new(config: ExecutionSyncConfig) -> Self { + Self { + last_executed: AtomicU64::new(0), + consecutive_failures: AtomicU32::new(0), + config, + } + } + + pub fn on_success(&self, height: u64) { + self.last_executed.store(height, Ordering::SeqCst); + self.consecutive_failures.store(0, Ordering::SeqCst); + } + + pub fn on_failure(&self, consensus_height: u64, error: &str) -> SyncAction { + let failures = self.consecutive_failures.fetch_add(1, Ordering::SeqCst) + 1; + let last_executed = self.last_executed.load(Ordering::SeqCst); + let divergence = consensus_height.saturating_sub(last_executed); + + error!( + last_executed, + consensus_height, + divergence, + consecutive_failures = failures, + error, + "Execution failed" + ); + + if divergence > self.config.max_divergence { + return SyncAction::Halt { + reason: format!( + "Divergence {} exceeds max {}. Last executed: {}, consensus: {}", + divergence, self.config.max_divergence, last_executed, consensus_height + ), + }; + } + + if failures > self.config.max_consecutive_failures { + return SyncAction::Halt { + reason: format!( + "Consecutive failures {} exceeds max {}", + failures, self.config.max_consecutive_failures + ), + }; + } + + SyncAction::Continue + } + + pub fn last_executed(&self) -> u64 { + self.last_executed.load(Ordering::SeqCst) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_success_resets_failures() { + let tracker = ExecutionSyncTracker::new(ExecutionSyncConfig::default()); + tracker.on_failure(5, "test"); + tracker.on_failure(6, "test"); + tracker.on_success(7); + assert_eq!(tracker.consecutive_failures.load(Ordering::SeqCst), 0); + assert_eq!(tracker.last_executed(), 7); + } + + #[test] + fn test_divergence_triggers_halt() { + let config = ExecutionSyncConfig { + max_divergence: 5, + max_consecutive_failures: 100, + }; + let tracker = ExecutionSyncTracker::new(config); + tracker.on_success(10); + let action = tracker.on_failure(16, "test"); + assert!(matches!(action, SyncAction::Halt { .. })); + } + + #[test] + fn test_consecutive_failures_triggers_halt() { + let config = ExecutionSyncConfig { + max_divergence: 100, + max_consecutive_failures: 3, + }; + let tracker = ExecutionSyncTracker::new(config); + tracker.on_success(10); + assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue); + assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue); + assert_eq!(tracker.on_failure(11, "e"), SyncAction::Continue); + assert!(matches!( + tracker.on_failure(11, "e"), + SyncAction::Halt { .. } + )); + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index ca63446..0b13430 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -7,13 +7,16 @@ pub mod client_config; pub mod config; pub mod execution_bridge; +pub mod execution_sync; pub mod genesis_bootstrap; pub mod key_cli; +pub mod mempool_state; pub mod network; pub mod network_api; pub mod node; pub mod supervisor; pub mod util; +pub mod worker_pool_adapter; pub use client_config::ClientConfig; pub use config::{ @@ -23,11 +26,14 @@ pub use config::{ DEFAULT_RPC_HTTP_PORT, DEFAULT_RPC_WS_PORT, }; pub use execution_bridge::ExecutionBridge; +pub use execution_sync::{ExecutionSyncConfig, ExecutionSyncTracker, SyncAction}; pub use genesis_bootstrap::{ GeneratedValidator, GenesisGenerationResult, GenesisGenerator, GenesisGeneratorConfig, GenesisLoader, ValidatorKeyFile, }; pub use key_cli::{execute_keys_command, KeysCommand}; +pub use mempool_state::ExecutionStateValidator; pub use network_api::{NodeNetworkApi, TcpNetworkApi}; pub use node::Node; pub use supervisor::{NodeSupervisor, ShutdownError}; +pub use worker_pool_adapter::WorkerPoolAdapter; diff --git a/crates/node/src/mempool_state.rs b/crates/node/src/mempool_state.rs new file mode 100644 index 0000000..4abf819 --- /dev/null +++ b/crates/node/src/mempool_state.rs @@ -0,0 +1,92 @@ +//! State provider for mempool transaction validation. +//! +//! This module provides the bridge between the mempool's transaction validation +//! and the execution layer's state. It implements the `ExecutionLayerValidator` +//! trait to enable mempool validation against the current execution state. + +use cipherbft_execution::{Bytes, ExecutionLayer, InMemoryProvider, Provider}; +use cipherbft_mempool::ExecutionLayerValidator; +use std::sync::Arc; +use tokio::sync::RwLock; + +/// Validates transactions against execution layer state. +/// +/// This validator wraps the execution layer and provides async validation +/// for transactions entering the mempool. It ensures transactions are +/// validated against the current state (balance, nonce, gas limits) before +/// being accepted into the pool. +#[derive(Debug)] +pub struct ExecutionStateValidator { + execution: Arc>>, +} + +impl ExecutionStateValidator

{ + /// Create a new execution state validator. + /// + /// # Arguments + /// + /// * `execution` - Shared reference to the execution layer + /// + /// # Returns + /// + /// A new `ExecutionStateValidator` that can validate transactions + /// against the execution layer's current state. + pub fn new(execution: Arc>>) -> Self { + Self { execution } + } +} + +#[async_trait::async_trait] +impl ExecutionLayerValidator + for ExecutionStateValidator

+{ + async fn validate_transaction(&self, tx_bytes: &[u8]) -> Result<(), String> { + let execution = self.execution.read().await; + execution + .validate_transaction(&Bytes::from(tx_bytes.to_vec())) + .map_err(|e| format!("Execution validation failed: {}", e)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use cipherbft_execution::ChainConfig; + + #[tokio::test] + async fn test_execution_state_validator_creation() { + let config = ChainConfig::default(); + let execution = ExecutionLayer::new(config).expect("create execution layer"); + let execution = Arc::new(RwLock::new(execution)); + let validator = ExecutionStateValidator::new(execution); + + // Verify the validator was created (Debug trait works) + let debug_str = format!("{:?}", validator); + assert!(debug_str.contains("ExecutionStateValidator")); + } + + #[tokio::test] + async fn test_validate_invalid_transaction() { + let config = ChainConfig::default(); + let execution = ExecutionLayer::new(config).expect("create execution layer"); + let execution = Arc::new(RwLock::new(execution)); + let validator = ExecutionStateValidator::new(execution); + + // Empty bytes should fail validation + let result = validator.validate_transaction(&[]).await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Execution validation failed")); + } + + #[tokio::test] + async fn test_validate_malformed_transaction() { + let config = ChainConfig::default(); + let execution = ExecutionLayer::new(config).expect("create execution layer"); + let execution = Arc::new(RwLock::new(execution)); + let validator = ExecutionStateValidator::new(execution); + + // Random bytes should fail validation + let result = validator.validate_transaction(&[0x01, 0x02, 0x03]).await; + assert!(result.is_err()); + } +} diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index b84bc42..ea1dab1 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -17,6 +17,7 @@ use crate::config::NodeConfig; use crate::execution_bridge::{BlockExecutionResult, ExecutionBridge}; +use crate::execution_sync::{ExecutionSyncConfig, ExecutionSyncTracker, SyncAction}; use crate::network::{TcpPrimaryNetwork, TcpWorkerNetwork}; use crate::supervisor::NodeSupervisor; use alloy_primitives::{Address, B256}; @@ -1384,6 +1385,10 @@ impl Node { epoch_block_reward: U256, gas_limit: u64, ) -> Result<()> { + // Initialize execution-consensus sync tracker + // This detects when execution falls behind and halts before unrecoverable divergence + let sync_tracker = ExecutionSyncTracker::new(ExecutionSyncConfig::default()); + loop { tokio::select! { biased; @@ -1503,6 +1508,9 @@ impl Node { match bridge.execute_cut(cut).await { Ok(block_result) => { + // Track successful execution for divergence detection + sync_tracker.on_success(height.0); + info!( "Cut executed successfully - state_root: {}, gas_used: {}, block_hash: {}", block_result.execution_result.state_root, @@ -1656,7 +1664,25 @@ impl Node { } } Err(e) => { - error!("Cut execution failed: {}", e); + // Check sync tracker for divergence-based halt decision + let action = sync_tracker.on_failure(height.0, &e.to_string()); + match action { + SyncAction::Continue => { + // Log error but continue processing + error!("Cut execution failed at height {}: {}", height.0, e); + } + SyncAction::Halt { reason } => { + // Critical divergence detected - halt node + error!( + "CRITICAL: Execution-consensus divergence detected. {}. \ + Halting node to prevent unrecoverable state.", + reason + ); + return Err(anyhow::anyhow!( + "Execution halted due to divergence: {}", reason + )); + } + } } } } else { diff --git a/crates/node/src/worker_pool_adapter.rs b/crates/node/src/worker_pool_adapter.rs new file mode 100644 index 0000000..dcdb485 --- /dev/null +++ b/crates/node/src/worker_pool_adapter.rs @@ -0,0 +1,168 @@ +//! Adapter for Workers to pull transactions from the mempool. +//! +//! This provides a simplified interface for Workers to get transactions +//! from the pool without needing to know the underlying pool implementation. + +use alloy_eips::eip2718::Encodable2718; +use alloy_primitives::B256; +use reth_primitives::TransactionSigned; +use reth_transaction_pool::{PoolTransaction, TransactionPool}; +use std::sync::Arc; +use tracing::info; + +/// Adapter for Workers to get transactions from the pool. +/// +/// Wraps a `TransactionPool` and provides worker-friendly methods +/// to retrieve and manage transactions for batch creation. +pub struct WorkerPoolAdapter { + pool: Arc

, + worker_id: u8, +} + +impl WorkerPoolAdapter

{ + /// Create a new adapter. + pub fn new(pool: Arc

, worker_id: u8) -> Self { + Self { pool, worker_id } + } + + /// Get the worker ID associated with this adapter. + pub fn worker_id(&self) -> u8 { + self.worker_id + } +} + +impl

WorkerPoolAdapter

+where + P: TransactionPool, + P::Transaction: PoolTransaction, +{ + /// Get best transactions for a batch, encoded as bytes. + /// + /// Transactions are returned in gas-price order (highest first), + /// respecting nonce sequences per sender. + /// + /// # Arguments + /// * `max_txs` - Maximum number of transactions to include + /// * `gas_limit` - Maximum cumulative gas for the batch + /// + /// # Returns + /// Vector of EIP-2718 encoded transactions + pub fn get_transactions_for_batch(&self, max_txs: usize, gas_limit: u64) -> Vec> { + let mut transactions = Vec::with_capacity(max_txs); + let mut gas_used = 0u64; + + for tx in self.pool.best_transactions() { + if transactions.len() >= max_txs { + break; + } + + let tx_gas = tx.gas_limit(); + if gas_used.saturating_add(tx_gas) > gas_limit { + // Skip this transaction but continue looking for smaller ones + continue; + } + + gas_used += tx_gas; + + // Get the consensus transaction and encode it using EIP-2718 format + let signed_tx = tx.transaction.clone_into_consensus().into_inner(); + let encoded = signed_tx.encoded_2718(); + transactions.push(encoded); + } + + info!( + worker_id = self.worker_id, + tx_count = transactions.len(), + gas_used, + "Retrieved transactions for batch" + ); + + transactions + } + + /// Get best transactions for a batch as signed transactions. + /// + /// This is useful when the caller needs to inspect transaction + /// details before encoding. + pub fn get_signed_transactions_for_batch( + &self, + max_txs: usize, + gas_limit: u64, + ) -> Vec { + let mut transactions = Vec::with_capacity(max_txs); + let mut gas_used = 0u64; + + for tx in self.pool.best_transactions() { + if transactions.len() >= max_txs { + break; + } + + let tx_gas = tx.gas_limit(); + if gas_used.saturating_add(tx_gas) > gas_limit { + continue; + } + + gas_used += tx_gas; + let signed_tx = tx.transaction.clone_into_consensus().into_inner(); + transactions.push(signed_tx); + } + + transactions + } + + /// Mark transactions as included (removes from pool). + /// + /// Call this after a block containing these transactions has been finalized. + pub fn mark_included(&self, tx_hashes: &[B256]) { + if tx_hashes.is_empty() { + return; + } + + let removed = self.pool.remove_transactions(tx_hashes.to_vec()); + info!( + worker_id = self.worker_id, + requested = tx_hashes.len(), + removed = removed.len(), + "Removed included transactions from pool" + ); + } + + /// Get current pool statistics. + pub fn pool_size(&self) -> PoolSize { + let size = self.pool.pool_size(); + PoolSize { + pending: size.pending, + queued: size.queued, + } + } +} + +/// Pool size statistics. +#[derive(Debug, Clone, Copy)] +pub struct PoolSize { + /// Number of pending (executable) transactions. + pub pending: usize, + /// Number of queued (future nonce) transactions. + pub queued: usize, +} + +impl PoolSize { + /// Total number of transactions in the pool. + pub fn total(&self) -> usize { + self.pending + self.queued + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pool_size_total() { + let size = PoolSize { + pending: 10, + queued: 5, + }; + assert_eq!(size.total(), 15); + } +} From 5f5ac04edcbd20168b28d6a88231520b27859105 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 30 Jan 2026 21:51:12 +0900 Subject: [PATCH 02/17] fix: add max reset limit for batched Car timeouts --- .../src/primary/attestation_collector.rs | 23 +++++++++- crates/data-chain/src/primary/runner.rs | 45 +++++++++++++------ 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/crates/data-chain/src/primary/attestation_collector.rs b/crates/data-chain/src/primary/attestation_collector.rs index cdc1b7d..8888a60 100644 --- a/crates/data-chain/src/primary/attestation_collector.rs +++ b/crates/data-chain/src/primary/attestation_collector.rs @@ -10,6 +10,10 @@ use cipherbft_types::{Hash, ValidatorId}; use std::collections::HashMap; use std::time::{Duration, Instant}; +/// Maximum number of timeout resets allowed for batched Cars. +/// This prevents Cars from being stuck indefinitely when peers cannot sync batches. +const MAX_BATCHED_CAR_RESETS: u32 = 10; + /// Pending attestation collection for a Car #[derive(Debug)] struct PendingAttestation { @@ -24,6 +28,8 @@ struct PendingAttestation { started_at: Instant, /// Current backoff duration current_backoff: Duration, + /// Number of times the timeout has been reset (for batched Cars) + reset_count: u32, } impl PendingAttestation { @@ -34,6 +40,7 @@ impl PendingAttestation { attestations: HashMap::new(), started_at: Instant::now(), current_backoff: base_timeout, + reset_count: 0, } } } @@ -237,17 +244,31 @@ impl AttestationCollector { /// Reset timeout for a Car without losing existing attestations /// /// Used for batched Cars that need extra time for peers to sync batch data. - /// Returns true if the Car was found and reset, false otherwise. + /// Returns true if the Car was found and reset successfully. + /// Returns false if the Car was not found OR if max reset count exceeded. + /// + /// This prevents Cars from being stuck indefinitely when peers cannot sync batches + /// (e.g., due to position divergence where peers reject the Car). pub fn reset_timeout(&mut self, car_hash: &Hash) -> bool { if let Some(pending) = self.pending.get_mut(car_hash) { + // Check if we've exceeded max resets + if pending.reset_count >= MAX_BATCHED_CAR_RESETS { + return false; + } pending.started_at = std::time::Instant::now(); pending.current_backoff = self.base_timeout; + pending.reset_count += 1; true } else { false } } + /// Get the reset count for a pending Car + pub fn reset_count(&self, car_hash: &Hash) -> Option { + self.pending.get(car_hash).map(|p| p.reset_count) + } + /// Get current attestation count for a Car pub fn attestation_count(&self, car_hash: &Hash) -> Option { self.pending.get(car_hash).map(|p| p.attestations.len() + 1) // +1 for self diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index aada0ba..0d43d3c 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -1377,20 +1377,39 @@ impl Primary { ); // Could re-broadcast Car here } else { - // IMPORTANT: Don't timeout Cars with batches! - // Peers need extra time to sync batch data before they can attest. - // Without this, batched Cars timeout before peers finish syncing, - // causing attestations to be rejected with UnknownCar error. + // Max backoff exceeded if has_batches { - // Reset the timeout without losing existing attestations - info!( - hash = %hash, - position = car.position, - batch_count = car.batch_digests.len(), - attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0), - "Extending timeout for batched Car - peers may still be syncing" - ); - self.attestation_collector.reset_timeout(&hash); + // Try to extend timeout for batched Cars that need more time. + // reset_timeout() returns false if max resets exceeded. + let reset_count = self.attestation_collector.reset_count(&hash).unwrap_or(0); + if self.attestation_collector.reset_timeout(&hash) { + info!( + hash = %hash, + position = car.position, + batch_count = car.batch_digests.len(), + attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0), + reset_count = reset_count + 1, + "Extending timeout for batched Car - peers may still be syncing" + ); + } else { + // Max resets exceeded - drop the Car and restore batches + warn!( + hash = %hash, + position = car.position, + batch_count = car.batch_digests.len(), + attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0), + reset_count, + "Batched Car exceeded max timeout resets - dropping and restoring batches" + ); + self.attestation_collector.remove(&hash); + self.state.remove_pending_car(&hash); + + // Restore batch digests to pending so they can be re-batched + // This ensures transactions are not lost + for digest in &car.batch_digests { + self.state.add_batch_digest(digest.clone()); + } + } } else { warn!( hash = %hash, From 35a8cb5334aa569d28771d602ff20aef29ae06e1 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 30 Jan 2026 23:19:57 +0900 Subject: [PATCH 03/17] fix: sync position tracking from CarWithAttestation --- crates/data-chain/src/primary/runner.rs | 42 +++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index 0d43d3c..f794cf3 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -571,6 +571,15 @@ impl Primary { "Checked for ready Cars after batch sync" ); for car in ready_cars { + // Skip if this is our own Car (defensive check) + if car.proposer == self.config.validator_id { + trace!( + position = car.position, + "Skipping attestation for our own Car in batch sync" + ); + continue; + } + // IMPORTANT: The Car was already validated when first received // (position check, signature, parent_ref all passed). We queued // it only because batches were missing. Now that batches are @@ -858,6 +867,15 @@ impl Primary { /// Handle a received Car async fn handle_received_car(&mut self, from: ValidatorId, car: Car) { + // Skip if this is our own Car (we handle our own Cars through attestation collection) + if car.proposer == self.config.validator_id { + trace!( + position = car.position, + "Ignoring received Car for our own Car" + ); + return; + } + // DIAGNOSTIC: Log at INFO level for batched Cars to trace attestation flow let batch_count = car.batch_digests.len(); if batch_count > 0 { @@ -1228,6 +1246,30 @@ impl Primary { "Received valid CarWithAttestation from peer" ); + // CRITICAL FIX: Update position tracking from attested Car broadcasts + // + // Without this, validators can fall into a position gap death spiral: + // 1. Validator A's Cars don't reach quorum (for whatever reason) + // 2. Other validators' last_seen_positions[A] becomes stale + // 3. When A broadcasts new Cars, others detect a "position gap" + // 4. No attestations are generated, A's Cars never reach quorum + // 5. The gap grows forever + // + // By updating position tracking when we receive a valid CarWithAttestation + // (which has quorum verification), we stay in sync even if we missed + // some intermediate Cars. This breaks the death spiral. + let current_pos = self.state.last_seen_positions.get(&car.proposer).copied(); + if current_pos.is_none_or(|p| car.position > p) { + self.state + .update_last_seen(car.proposer, car.position, car_hash); + info!( + proposer = %car.proposer, + old_position = current_pos, + new_position = car.position, + "Updated position tracking from CarWithAttestation broadcast" + ); + } + // Persist attestation to storage if available if let Some(ref storage) = self.storage { if let Err(e) = storage.put_attestation(attestation.clone()).await { From c43d0e0c074479aaa354107aaceca9e2ccd1a61d Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 30 Jan 2026 23:48:28 +0900 Subject: [PATCH 04/17] fix: recover position tracking on position gap with valid signature --- crates/data-chain/src/primary/runner.rs | 40 +++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index f794cf3..5ee59e2 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -1000,6 +1000,46 @@ impl Primary { "Position gap detected, initiating gap recovery" ); + // CRITICAL FIX: Update position tracking when we're behind + // + // If actual > expected, we've fallen behind this validator's position. + // The Car signature was already validated by core.handle_car(), so we + // know this is a legitimate Car. Update tracking to prevent death spiral: + // + // Without this fix: + // 1. We miss some Cars from validator X + // 2. New Cars from X trigger position gap errors + // 3. We never attest to X's new Cars + // 4. X's Cars never reach quorum + // 5. Transactions stuck forever + // + // With this fix: + // - We update tracking to actual position + // - Next Car from X (at actual+1) will be attested normally + // - Network recovers quickly + if actual > expected { + let car_hash = car.hash(); + info!( + proposer = %validator, + expected, + actual, + car_hash = %car_hash, + "Updating position tracking to recover from gap - signature was valid" + ); + self.state.update_last_seen(validator, actual, car_hash); + + // Also generate attestation since signature is valid and we're syncing + // This helps the network reach quorum faster + let attestation = self.core.create_attestation(&car); + DCL_ATTESTATIONS_SENT.inc(); + self.network.send_attestation(car.proposer, &attestation).await; + info!( + proposer = %validator, + position = actual, + "Generated attestation after position gap recovery" + ); + } + // Queue the out-of-order Car for later processing if !self.state.is_awaiting_gap_sync(&validator, actual) { self.state.queue_car_awaiting_gap(car.clone(), expected); From 7f0d42de88c17490144d6525c5162eb036919e5e Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Fri, 30 Jan 2026 23:56:12 +0900 Subject: [PATCH 05/17] style: fix rustfmt formatting --- crates/data-chain/src/primary/runner.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index 5ee59e2..48961f3 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -1032,7 +1032,9 @@ impl Primary { // This helps the network reach quorum faster let attestation = self.core.create_attestation(&car); DCL_ATTESTATIONS_SENT.inc(); - self.network.send_attestation(car.proposer, &attestation).await; + self.network + .send_attestation(car.proposer, &attestation) + .await; info!( proposer = %validator, position = actual, From 74243168870ce2da0b2e569f4048734cf339ab3c Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 01:03:52 +0900 Subject: [PATCH 06/17] fix: disable nonce check in eth_call/estimateGas --- crates/rpc/src/adapters.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index 86fa01c..a59e2e5 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -2189,6 +2189,9 @@ impl EvmExecutionApi

{ // Configure chain settings ctx.cfg.chain_id = self.chain_id; + // Disable nonce check for eth_call and eth_estimateGas + // These are simulation calls that shouldn't require valid nonce + ctx.cfg.disable_nonce_check = true; // Set up transaction environment ctx.tx.caller = from.unwrap_or(Address::ZERO); @@ -2647,6 +2650,8 @@ where // Configure chain settings ctx.cfg.chain_id = self.chain_id; + // Disable nonce check for trace calls (simulation) + ctx.cfg.disable_nonce_check = true; // Set up transaction environment ctx.tx.caller = from.unwrap_or(Address::ZERO); @@ -2757,6 +2762,8 @@ where // Configure chain settings ctx.cfg.chain_id = self.chain_id; + // Disable nonce check for trace calls (simulation) + ctx.cfg.disable_nonce_check = true; // Set up transaction environment ctx.tx.caller = from.unwrap_or(Address::ZERO); From 4e16408b5cf51ba5f9638bce946d68f634045445 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 02:09:35 +0900 Subject: [PATCH 07/17] fix: cleanup mempool pending map after block exec --- crates/node/src/node.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index ea1dab1..d34d4ff 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1227,6 +1227,10 @@ impl Node { None }; + // Mempool handle for cleaning up pending transactions after block execution + // This is set when RPC is enabled and DCL is enabled (ChannelMempoolApi) + let mut rpc_mempool: Option> = None; + // Start RPC server if enabled if let (Some(ref storage), Some(ref debug_executor), Some(ref sub_mgr)) = (&rpc_storage, &rpc_debug_executor, &subscription_manager) @@ -1262,6 +1266,9 @@ impl Node { NodeNetworkApi::stub() }); + // Clone mempool for use in event loop to clean up pending txs after block execution + rpc_mempool = Some(mempool.clone()); + // Use with_subscription_manager to share the subscription manager // between the RPC server and the event loop for broadcasting blocks let rpc_server = RpcServer::with_subscription_manager( @@ -1329,6 +1336,7 @@ impl Node { subscription_manager, rpc_debug_executor, rpc_executor, + rpc_mempool, epoch_config, self.epoch_block_reward, gas_limit, @@ -1381,6 +1389,7 @@ impl Node { rpc_executor: Option< Arc>, >, + rpc_mempool: Option>, epoch_config: EpochConfig, epoch_block_reward: U256, gas_limit: u64, @@ -1640,6 +1649,31 @@ impl Node { sub_mgr.broadcast_block(rpc_block); debug!("Broadcast block {} to WebSocket subscribers", height.0); } + + // Clean up executed transactions from mempool pending map + // This prevents stale transactions from accumulating in txpool_* responses + if let Some(ref mempool) = rpc_mempool { + use alloy_rlp::Decodable; + use reth_primitives::TransactionSigned; + + let tx_hashes: Vec = block_result + .executed_transactions + .iter() + .filter_map(|tx_bytes| { + TransactionSigned::decode(&mut tx_bytes.as_ref()) + .ok() + .map(|tx| *tx.tx_hash()) + }) + .collect(); + + if !tx_hashes.is_empty() { + mempool.remove_included(&tx_hashes); + debug!( + "Removed {} executed transactions from mempool pending map", + tx_hashes.len() + ); + } + } } } From 4cebf59f1aa624c9d5631391a91cabf8403cf729 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 02:09:42 +0900 Subject: [PATCH 08/17] perf: tune batch/car intervals for faster tx --- crates/data-chain/src/primary/config.rs | 8 ++++++-- crates/data-chain/src/worker/config.rs | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/data-chain/src/primary/config.rs b/crates/data-chain/src/primary/config.rs index 4fd1f05..3f342d0 100644 --- a/crates/data-chain/src/primary/config.rs +++ b/crates/data-chain/src/primary/config.rs @@ -44,14 +44,18 @@ pub struct PrimaryConfig { impl PrimaryConfig { /// Create a new configuration with defaults + /// + /// Default intervals are tuned for responsive transaction processing: + /// - `car_interval`: 50ms matches Worker flush interval for quick Car creation + /// - `max_empty_cars`: 1 reduces timing races where empty Cars compete with tx-bearing Cars pub fn new(validator_id: ValidatorId, bls_secret_key: BlsSecretKey) -> Self { Self { validator_id, bls_secret_key, - car_interval: Duration::from_millis(100), + car_interval: Duration::from_millis(50), // Faster Car creation attestation_timeout_base: Duration::from_millis(500), attestation_timeout_max: Duration::from_millis(5000), - max_empty_cars: 3, + max_empty_cars: 1, // Reduce empty Car spam during tx processing worker_count: 1, equivocation_retention: 1000, startup_delay: Duration::from_secs(2), diff --git a/crates/data-chain/src/worker/config.rs b/crates/data-chain/src/worker/config.rs index c33a082..4d4bad7 100644 --- a/crates/data-chain/src/worker/config.rs +++ b/crates/data-chain/src/worker/config.rs @@ -20,13 +20,17 @@ pub struct WorkerConfig { impl WorkerConfig { /// Create a new configuration with defaults + /// + /// Default batch thresholds are tuned for responsive transaction processing: + /// - `max_batch_txs`: 100 transactions triggers immediate batch flush + /// - `flush_interval`: 50ms ensures batches don't wait too long pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self { Self { validator_id, worker_id, max_batch_bytes: 1024 * 1024, // 1MB - max_batch_txs: 1000, - flush_interval: Duration::from_millis(100), + max_batch_txs: 100, // Flush after 100 txs for responsive batching + flush_interval: Duration::from_millis(50), // Faster time-based flush } } From b5ac0b3acb7922c087f1bad1f4b899549e142949 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 02:15:15 +0900 Subject: [PATCH 09/17] style: fix rustfmt comment alignment --- crates/data-chain/src/worker/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/data-chain/src/worker/config.rs b/crates/data-chain/src/worker/config.rs index 4d4bad7..98608fe 100644 --- a/crates/data-chain/src/worker/config.rs +++ b/crates/data-chain/src/worker/config.rs @@ -28,7 +28,7 @@ impl WorkerConfig { Self { validator_id, worker_id, - max_batch_bytes: 1024 * 1024, // 1MB + max_batch_bytes: 1024 * 1024, // 1MB max_batch_txs: 100, // Flush after 100 txs for responsive batching flush_interval: Duration::from_millis(50), // Faster time-based flush } From 6af70df62ad08dae894e2b8ff054b81cc2058abf Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 02:27:37 +0900 Subject: [PATCH 10/17] fix: sort txs by sender/nonce before execution --- crates/execution/src/engine.rs | 64 +++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/crates/execution/src/engine.rs b/crates/execution/src/engine.rs index 621830f..d0f687a 100644 --- a/crates/execution/src/engine.rs +++ b/crates/execution/src/engine.rs @@ -326,6 +326,11 @@ impl ExecutionEngine

{ /// /// Returns a tuple containing receipts, cumulative gas used, logs, and total fees. /// See [`ProcessTransactionsResult`] for details. + /// + /// **Important**: Transactions are sorted by (sender, nonce) before execution to ensure + /// correct nonce ordering. This prevents "NonceTooLow" errors when transactions from + /// the same sender arrive in different batches/Cars and would otherwise be executed + /// out of order. fn process_transactions( &mut self, transactions: &[Bytes], @@ -333,11 +338,68 @@ impl ExecutionEngine

{ timestamp: u64, parent_hash: B256, ) -> Result { + use alloy_consensus::{Transaction, TxEnvelope}; + use alloy_eips::Decodable2718; + let mut receipts = Vec::new(); let mut cumulative_gas_used = 0u64; let mut all_logs = Vec::new(); let mut total_fees = U256::ZERO; + // Sort transactions by (sender, nonce) to ensure correct execution order + // This prevents NonceTooLow errors when txs from same sender arrive out of order + let mut sorted_txs: Vec<(usize, &Bytes)> = transactions.iter().enumerate().collect(); + sorted_txs.sort_by(|(_, a), (_, b)| { + let parse_tx = + |tx_bytes: &Bytes| -> Option<(alloy_primitives::Address, u64)> { + let tx_envelope = TxEnvelope::decode_2718(&mut tx_bytes.as_ref()).ok()?; + let nonce = tx_envelope.nonce(); + + // Recover sender from signature + let sender = match &tx_envelope { + TxEnvelope::Legacy(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip2930(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip1559(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip4844(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip7702(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + }?; + + Some((sender, nonce)) + }; + + match (parse_tx(a), parse_tx(b)) { + (Some((sender_a, nonce_a)), Some((sender_b, nonce_b))) => { + // Sort by sender first, then by nonce + sender_a.cmp(&sender_b).then(nonce_a.cmp(&nonce_b)) + } + // Keep unparseable transactions in original order + (None, Some(_)) => std::cmp::Ordering::Greater, + (Some(_), None) => std::cmp::Ordering::Less, + (None, None) => std::cmp::Ordering::Equal, + } + }); + + tracing::debug!( + block_number, + tx_count = transactions.len(), + "Sorted transactions by (sender, nonce) for execution" + ); + // Scope for EVM execution to ensure it's dropped before commit let state_changes = { // Build EVM instance with custom precompiles (including staking precompile at 0x100) @@ -349,7 +411,7 @@ impl ExecutionEngine

{ Arc::clone(&self.staking_precompile), ); - for (tx_index, tx_bytes) in transactions.iter().enumerate() { + for (tx_index, (_original_index, tx_bytes)) in sorted_txs.into_iter().enumerate() { let tx_start = Instant::now(); // Execute transaction From 0b6f07fdd3cad4505e5fe4993169f5ccaa7080ca Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 09:03:58 +0900 Subject: [PATCH 11/17] fix: retry pending txs after block execution --- crates/node/src/node.rs | 16 ++++++++++-- crates/rpc/src/adapters.rs | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index d34d4ff..5997d6c 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1650,8 +1650,9 @@ impl Node { debug!("Broadcast block {} to WebSocket subscribers", height.0); } - // Clean up executed transactions from mempool pending map - // This prevents stale transactions from accumulating in txpool_* responses + // Clean up executed transactions and retry pending ones + // This prevents stale transactions from accumulating and + // ensures skipped transactions (e.g., NonceTooLow) are retried if let Some(ref mempool) = rpc_mempool { use alloy_rlp::Decodable; use reth_primitives::TransactionSigned; @@ -1667,11 +1668,22 @@ impl Node { .collect(); if !tx_hashes.is_empty() { + // Remove executed transactions from pending map mempool.remove_included(&tx_hashes); debug!( "Removed {} executed transactions from mempool pending map", tx_hashes.len() ); + + // Retry any remaining pending transactions + // These may have been skipped due to nonce issues + let retried = mempool.retry_pending(&tx_hashes).await; + if retried > 0 { + debug!( + "Re-queued {} pending transactions for retry", + retried + ); + } } } } diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index a59e2e5..5ef6ae0 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -1528,6 +1528,49 @@ impl ChannelMempoolApi { pending.remove(hash); } } + + /// Retry pending transactions that were not included in the block. + /// + /// After block execution, some transactions may have been skipped (e.g., NonceTooLow + /// due to out-of-order arrival). This re-submits them to the Worker for retry. + /// + /// # Arguments + /// * `executed_tx_hashes` - Hashes of transactions that were successfully executed + /// + /// # Returns + /// Number of transactions re-submitted for retry + pub async fn retry_pending(&self, executed_tx_hashes: &[B256]) -> usize { + // Get pending transactions that were NOT executed + let to_retry: Vec> = { + let pending = self.pending.read(); + pending + .iter() + .filter(|(hash, _)| !executed_tx_hashes.contains(hash)) + .map(|(_, (_, tx_bytes))| tx_bytes.clone()) + .collect() + }; + + if to_retry.is_empty() { + return 0; + } + + let mut retried = 0; + for tx_bytes in to_retry { + // Re-send to Worker channel (don't modify pending map - it's already there) + if self.tx_sender.send(tx_bytes).await.is_ok() { + retried += 1; + } + } + + if retried > 0 { + info!( + "Retried {} pending transactions for re-processing", + retried + ); + } + + retried + } } #[async_trait] @@ -1679,6 +1722,16 @@ impl MempoolWrapper { api.remove_included(tx_hashes); } } + + /// Retry pending transactions that were not included in the block. + /// + /// Only has effect for `Channel` variant. Returns 0 for `Stub`. + pub async fn retry_pending(&self, executed_tx_hashes: &[B256]) -> usize { + match self { + Self::Channel(api) => api.retry_pending(executed_tx_hashes).await, + Self::Stub(_) => 0, + } + } } #[async_trait] From de6a056793d05e5491c40e09a59c2c6300be91df Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 09:07:31 +0900 Subject: [PATCH 12/17] style: fix rustfmt formatting --- crates/execution/src/engine.rs | 61 +++++++++++++++++----------------- crates/rpc/src/adapters.rs | 5 +-- 2 files changed, 31 insertions(+), 35 deletions(-) diff --git a/crates/execution/src/engine.rs b/crates/execution/src/engine.rs index d0f687a..f6bb068 100644 --- a/crates/execution/src/engine.rs +++ b/crates/execution/src/engine.rs @@ -350,37 +350,36 @@ impl ExecutionEngine

{ // This prevents NonceTooLow errors when txs from same sender arrive out of order let mut sorted_txs: Vec<(usize, &Bytes)> = transactions.iter().enumerate().collect(); sorted_txs.sort_by(|(_, a), (_, b)| { - let parse_tx = - |tx_bytes: &Bytes| -> Option<(alloy_primitives::Address, u64)> { - let tx_envelope = TxEnvelope::decode_2718(&mut tx_bytes.as_ref()).ok()?; - let nonce = tx_envelope.nonce(); - - // Recover sender from signature - let sender = match &tx_envelope { - TxEnvelope::Legacy(signed) => signed - .signature() - .recover_address_from_prehash(&signed.signature_hash()) - .ok(), - TxEnvelope::Eip2930(signed) => signed - .signature() - .recover_address_from_prehash(&signed.signature_hash()) - .ok(), - TxEnvelope::Eip1559(signed) => signed - .signature() - .recover_address_from_prehash(&signed.signature_hash()) - .ok(), - TxEnvelope::Eip4844(signed) => signed - .signature() - .recover_address_from_prehash(&signed.signature_hash()) - .ok(), - TxEnvelope::Eip7702(signed) => signed - .signature() - .recover_address_from_prehash(&signed.signature_hash()) - .ok(), - }?; - - Some((sender, nonce)) - }; + let parse_tx = |tx_bytes: &Bytes| -> Option<(alloy_primitives::Address, u64)> { + let tx_envelope = TxEnvelope::decode_2718(&mut tx_bytes.as_ref()).ok()?; + let nonce = tx_envelope.nonce(); + + // Recover sender from signature + let sender = match &tx_envelope { + TxEnvelope::Legacy(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip2930(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip1559(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip4844(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + TxEnvelope::Eip7702(signed) => signed + .signature() + .recover_address_from_prehash(&signed.signature_hash()) + .ok(), + }?; + + Some((sender, nonce)) + }; match (parse_tx(a), parse_tx(b)) { (Some((sender_a, nonce_a)), Some((sender_b, nonce_b))) => { diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index 5ef6ae0..2015596 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -1563,10 +1563,7 @@ impl ChannelMempoolApi { } if retried > 0 { - info!( - "Retried {} pending transactions for re-processing", - retried - ); + info!("Retried {} pending transactions for re-processing", retried); } retried From 6a49012714019f1fd2ca7c94015721c85db9b6fd Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 09:20:11 +0900 Subject: [PATCH 13/17] fix: prevent Cut monotonicity violations --- crates/data-chain/src/primary/runner.rs | 5 +++ crates/data-chain/src/primary/state.rs | 45 ++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/crates/data-chain/src/primary/runner.rs b/crates/data-chain/src/primary/runner.rs index 48961f3..f14fd03 100644 --- a/crates/data-chain/src/primary/runner.rs +++ b/crates/data-chain/src/primary/runner.rs @@ -633,6 +633,11 @@ impl Primary { "Received consensus decision notification" ); + // CRITICAL: Update last_cut to the DECIDED cut (not what we proposed) + // This ensures monotonicity checks in form_cut use the actual consensus + // state, preventing stale proposed cuts from causing violations + self.last_cut = Some(cut.clone()); + // CRITICAL: Sync position tracking from the decided Cut BEFORE advancing state // This ensures validators that missed some CARs during collection still have // consistent position tracking for subsequent heights diff --git a/crates/data-chain/src/primary/state.rs b/crates/data-chain/src/primary/state.rs index 895aabb..1ef0c9d 100644 --- a/crates/data-chain/src/primary/state.rs +++ b/crates/data-chain/src/primary/state.rs @@ -397,15 +397,50 @@ impl PrimaryState { pub fn mark_attested(&mut self, car: Car, aggregated: AggregatedAttestation) { let validator = car.proposer; let new_has_batches = !car.batch_digests.is_empty(); + let last_included = self + .last_included_positions + .get(&validator) + .copied() + .unwrap_or(0); + + // CRITICAL: Never store a Car that has already been finalized + // This prevents monotonicity violations in Cut formation + if car.position <= last_included { + tracing::debug!( + validator = %validator, + car_position = car.position, + last_included, + "Rejecting attested Car - position already finalized" + ); + return; + } // Check if we should replace the existing attested Car if let Some((existing_car, _)) = self.attested_cars.get(&validator) { let existing_has_batches = !existing_car.batch_digests.is_empty(); - let last_included = self - .last_included_positions - .get(&validator) - .copied() - .unwrap_or(0); + + // CRITICAL: Never go backwards in position + // This prevents monotonicity violations in Cut formation + if car.position < existing_car.position { + tracing::debug!( + validator = %validator, + existing_position = existing_car.position, + new_position = car.position, + "Rejecting attested Car - would go backwards in position" + ); + return; + } + + // Don't replace a Car with batches with an empty Car at same position + if existing_has_batches && !new_has_batches && car.position == existing_car.position { + tracing::debug!( + validator = %validator, + position = car.position, + existing_batches = existing_car.batch_digests.len(), + "Preserving Car with batches over empty Car at same position" + ); + return; + } // Don't replace a Car with batches with an empty Car, UNLESS the // existing Car's position has already been included in a decided Cut From 178cdefff63bea1c79b0830e8929b9ab22a74317 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 10:30:00 +0900 Subject: [PATCH 14/17] fix: only remove executed txs from mempool pending map --- crates/execution/src/engine.rs | 36 ++++++++++++++++++++++------- crates/execution/src/lib.rs | 1 + crates/execution/src/types.rs | 8 +++++++ crates/node/src/execution_bridge.rs | 11 +++++---- 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/crates/execution/src/engine.rs b/crates/execution/src/engine.rs index f6bb068..fd69a41 100644 --- a/crates/execution/src/engine.rs +++ b/crates/execution/src/engine.rs @@ -44,7 +44,14 @@ const BLOCK_HASH_CACHE_SIZE: NonZeroUsize = match NonZeroUsize::new(256) { /// - Cumulative gas used by all transactions /// - Logs emitted by each transaction /// - Total transaction fees collected (gas_used × effective_gas_price) -type ProcessTransactionsResult = (Vec, u64, Vec>, U256); +/// - Executed transaction bytes (only transactions with receipts, not skipped ones) +type ProcessTransactionsResult = ( + Vec, + u64, + Vec>, + U256, + Vec, +); /// ExecutionLayer trait defines the interface for block execution. /// @@ -345,6 +352,7 @@ impl ExecutionEngine

{ let mut cumulative_gas_used = 0u64; let mut all_logs = Vec::new(); let mut total_fees = U256::ZERO; + let mut executed_tx_bytes = Vec::new(); // Sort transactions by (sender, nonce) to ensure correct execution order // This prevents NonceTooLow errors when txs from same sender arrive out of order @@ -503,6 +511,7 @@ impl ExecutionEngine

{ receipts.push(receipt); all_logs.push(tx_result.logs); + executed_tx_bytes.push(tx_bytes.clone()); } // Finalize EVM to extract journal changes @@ -531,7 +540,13 @@ impl ExecutionEngine

{ ); } - Ok((receipts, cumulative_gas_used, all_logs, total_fees)) + Ok(( + receipts, + cumulative_gas_used, + all_logs, + total_fees, + executed_tx_bytes, + )) } /// Compute or retrieve state root based on block number. @@ -566,12 +581,14 @@ impl ExecutionLayer for ExecutionEngine

{ self.validate_block(&input)?; // Process all transactions and collect fees - let (receipts, gas_used, all_logs, total_fees) = self.process_transactions( - &input.transactions, - input.block_number, - input.timestamp, - input.parent_hash, - )?; + // executed_tx_bytes contains only transactions that actually executed (have receipts) + let (receipts, gas_used, all_logs, total_fees, executed_tx_bytes) = self + .process_transactions( + &input.transactions, + input.block_number, + input.timestamp, + input.parent_hash, + )?; if !total_fees.is_zero() { tracing::info!( @@ -665,6 +682,7 @@ impl ExecutionLayer for ExecutionEngine

{ block_hash, receipts, logs_bloom, + executed_transactions: executed_tx_bytes, }) } @@ -958,6 +976,7 @@ mod tests { block_hash: B256::ZERO, receipts: vec![], logs_bloom: Bloom::ZERO, + executed_transactions: vec![], }; let sealed = engine @@ -1018,6 +1037,7 @@ mod tests { block_hash: B256::ZERO, receipts: vec![], logs_bloom: Bloom::ZERO, + executed_transactions: vec![], }; let sealed = engine diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 298b282..2850862 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -593,6 +593,7 @@ mod tests { block_hash: B256::ZERO, receipts: vec![], logs_bloom: Bloom::ZERO, + executed_transactions: vec![], }; let sealed = execution_layer diff --git a/crates/execution/src/types.rs b/crates/execution/src/types.rs index 68de23e..66d70f8 100644 --- a/crates/execution/src/types.rs +++ b/crates/execution/src/types.rs @@ -351,6 +351,14 @@ pub struct ExecutionResult { /// Logs bloom filter. pub logs_bloom: Bloom, + + /// Raw bytes of transactions that actually executed (have receipts). + /// + /// This excludes transactions that were skipped due to nonce errors + /// (NonceTooLow, NonceTooHigh). Used by the node to properly clean up + /// the mempool pending map - only executed transactions should be removed. + #[serde(default)] + pub executed_transactions: Vec, } /// Transaction receipt. diff --git a/crates/node/src/execution_bridge.rs b/crates/node/src/execution_bridge.rs index 2dccc36..5c5608b 100644 --- a/crates/node/src/execution_bridge.rs +++ b/crates/node/src/execution_bridge.rs @@ -353,9 +353,8 @@ impl ExecutionBridge { let timestamp = execution_cut.timestamp; let parent_hash = execution_cut.parent_hash; - // Capture all transactions BEFORE they're consumed by execution. - // These will be stored by the node for eth_getTransactionByHash queries. - let executed_transactions: Vec = execution_cut + // Collect all transactions for the block input + let all_transactions: Vec = execution_cut .cars .iter() .flat_map(|car| car.transactions.iter().cloned()) @@ -365,7 +364,7 @@ impl ExecutionBridge { let block_input = BlockInput { block_number: execution_cut.block_number, timestamp: execution_cut.timestamp, - transactions: executed_transactions.clone(), + transactions: all_transactions, parent_hash: execution_cut.parent_hash, gas_limit: execution_cut.gas_limit, base_fee_per_gas: execution_cut.base_fee_per_gas, @@ -400,6 +399,10 @@ impl ExecutionBridge { "Block hash updated" ); + // Use executed_transactions from the execution result - this only contains + // transactions that actually executed (have receipts), not skipped ones + let executed_transactions = result.executed_transactions.clone(); + Ok(BlockExecutionResult { execution_result: result, block_hash: new_block_hash, From be6714ffce6a1a79dfe4830ad36e8f9c81bd7f73 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 11:15:06 +0900 Subject: [PATCH 15/17] fix: always retry pending txs after block --- crates/node/src/node.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 5997d6c..ec650c2 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -1667,23 +1667,25 @@ impl Node { }) .collect(); + // Remove executed transactions from pending map if !tx_hashes.is_empty() { - // Remove executed transactions from pending map mempool.remove_included(&tx_hashes); debug!( "Removed {} executed transactions from mempool pending map", tx_hashes.len() ); + } - // Retry any remaining pending transactions - // These may have been skipped due to nonce issues - let retried = mempool.retry_pending(&tx_hashes).await; - if retried > 0 { - debug!( - "Re-queued {} pending transactions for retry", - retried - ); - } + // ALWAYS retry pending transactions after every block + // Previously this was inside the if block, causing pending + // transactions to only retry when a block had executed txs. + // This led to multi-minute delays when the chain had empty blocks. + let retried = mempool.retry_pending(&tx_hashes).await; + if retried > 0 { + debug!( + "Re-queued {} pending transactions for retry", + retried + ); } } } From 9267eda0e3a6886d4f12c61f3fecce119dbf94c3 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 16:58:54 +0900 Subject: [PATCH 16/17] test: add retry_pending unit tests --- crates/rpc/src/adapters.rs | 90 +++++++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index 2015596..94350c0 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -1563,7 +1563,7 @@ impl ChannelMempoolApi { } if retried > 0 { - info!("Retried {} pending transactions for re-processing", retried); + debug!("Retried {} pending transactions for re-processing", retried); } retried @@ -3320,6 +3320,94 @@ mod tests { assert!(!network.is_listening().await.unwrap()); } + // ===== ChannelMempoolApi tests ===== + + #[tokio::test] + async fn test_channel_mempool_retry_pending_with_empty_executed_hashes() { + use tokio::sync::mpsc; + + // Create channel and mempool + let (tx_sender, mut rx) = mpsc::channel(100); + let mempool = ChannelMempoolApi::new(tx_sender, 1); + + // Add a pending transaction manually + let tx_hash = B256::repeat_byte(0x01); + let sender = Address::repeat_byte(0x02); + let tx_bytes = vec![0x01, 0x02, 0x03]; + { + let mut pending = mempool.pending.write(); + pending.insert(tx_hash, (sender, tx_bytes.clone())); + } + + // Retry with empty executed list - should retry ALL pending transactions + // This is the key behavior: when no transactions were executed in a block, + // we still want to retry all pending transactions + let retried = mempool.retry_pending(&[]).await; + assert_eq!(retried, 1, "Should retry the one pending transaction"); + + // Verify transaction was re-sent to the channel + let received = rx.try_recv().expect("Should have received the retried transaction"); + assert_eq!(received, tx_bytes, "Retried transaction should match original"); + + // Pending map should still contain the transaction (not removed by retry) + let pending = mempool.pending.read(); + assert!( + pending.contains_key(&tx_hash), + "Pending transaction should still be in the map after retry" + ); + } + + #[tokio::test] + async fn test_channel_mempool_retry_pending_excludes_executed() { + use tokio::sync::mpsc; + + // Create channel and mempool + let (tx_sender, mut rx) = mpsc::channel(100); + let mempool = ChannelMempoolApi::new(tx_sender, 1); + + // Add two pending transactions + let tx_hash_1 = B256::repeat_byte(0x01); + let tx_hash_2 = B256::repeat_byte(0x02); + let sender = Address::repeat_byte(0x03); + let tx_bytes_1 = vec![0x01]; + let tx_bytes_2 = vec![0x02]; + { + let mut pending = mempool.pending.write(); + pending.insert(tx_hash_1, (sender, tx_bytes_1.clone())); + pending.insert(tx_hash_2, (sender, tx_bytes_2.clone())); + } + + // Retry with tx_hash_1 as executed - should only retry tx_hash_2 + let retried = mempool.retry_pending(&[tx_hash_1]).await; + assert_eq!(retried, 1, "Should retry only the non-executed transaction"); + + // Verify only tx_bytes_2 was re-sent + let received = rx.try_recv().expect("Should have received the retried transaction"); + assert_eq!(received, tx_bytes_2, "Should only retry non-executed transaction"); + + // No more transactions in channel + assert!( + rx.try_recv().is_err(), + "Should not have retried the executed transaction" + ); + } + + #[tokio::test] + async fn test_channel_mempool_retry_pending_empty_pending_map() { + use tokio::sync::mpsc; + + // Create channel and mempool with no pending transactions + let (tx_sender, mut rx) = mpsc::channel(100); + let mempool = ChannelMempoolApi::new(tx_sender, 1); + + // Retry with empty pending map - should return 0 + let retried = mempool.retry_pending(&[]).await; + assert_eq!(retried, 0, "Should return 0 when no pending transactions"); + + // Verify nothing was sent to channel + assert!(rx.try_recv().is_err(), "Should not have sent anything"); + } + // ===== Log conversion helper tests ===== #[test] From c61897c9c6907b0d702e415f5408a0686a4d4c75 Mon Sep 17 00:00:00 2001 From: qj0r9j0vc2 Date: Sat, 31 Jan 2026 17:03:33 +0900 Subject: [PATCH 17/17] style: fix rustfmt formatting --- crates/rpc/src/adapters.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/rpc/src/adapters.rs b/crates/rpc/src/adapters.rs index 94350c0..3e63173 100644 --- a/crates/rpc/src/adapters.rs +++ b/crates/rpc/src/adapters.rs @@ -3346,8 +3346,13 @@ mod tests { assert_eq!(retried, 1, "Should retry the one pending transaction"); // Verify transaction was re-sent to the channel - let received = rx.try_recv().expect("Should have received the retried transaction"); - assert_eq!(received, tx_bytes, "Retried transaction should match original"); + let received = rx + .try_recv() + .expect("Should have received the retried transaction"); + assert_eq!( + received, tx_bytes, + "Retried transaction should match original" + ); // Pending map should still contain the transaction (not removed by retry) let pending = mempool.pending.read(); @@ -3382,8 +3387,13 @@ mod tests { assert_eq!(retried, 1, "Should retry only the non-executed transaction"); // Verify only tx_bytes_2 was re-sent - let received = rx.try_recv().expect("Should have received the retried transaction"); - assert_eq!(received, tx_bytes_2, "Should only retry non-executed transaction"); + let received = rx + .try_recv() + .expect("Should have received the retried transaction"); + assert_eq!( + received, tx_bytes_2, + "Should only retry non-executed transaction" + ); // No more transactions in channel assert!(