Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion crates/data-chain/src/primary/attestation_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use cipherbft_types::{Hash, ValidatorId};
use std::collections::HashMap;
use std::time::{Duration, Instant};

/// Maximum number of timeout resets allowed for batched Cars.
/// This prevents Cars from being stuck indefinitely when peers cannot sync batches.
const MAX_BATCHED_CAR_RESETS: u32 = 10;

/// Pending attestation collection for a Car
#[derive(Debug)]
struct PendingAttestation {
Expand All @@ -24,6 +28,8 @@ struct PendingAttestation {
started_at: Instant,
/// Current backoff duration
current_backoff: Duration,
/// Number of times the timeout has been reset (for batched Cars)
reset_count: u32,
}

impl PendingAttestation {
Expand All @@ -34,6 +40,7 @@ impl PendingAttestation {
attestations: HashMap::new(),
started_at: Instant::now(),
current_backoff: base_timeout,
reset_count: 0,
}
}
}
Expand Down Expand Up @@ -237,17 +244,31 @@ impl AttestationCollector {
/// Reset timeout for a Car without losing existing attestations
///
/// Used for batched Cars that need extra time for peers to sync batch data.
/// Returns true if the Car was found and reset, false otherwise.
/// Returns true if the Car was found and reset successfully.
/// Returns false if the Car was not found OR if max reset count exceeded.
///
/// This prevents Cars from being stuck indefinitely when peers cannot sync batches
/// (e.g., due to position divergence where peers reject the Car).
pub fn reset_timeout(&mut self, car_hash: &Hash) -> bool {
if let Some(pending) = self.pending.get_mut(car_hash) {
// Check if we've exceeded max resets
if pending.reset_count >= MAX_BATCHED_CAR_RESETS {
return false;
}
pending.started_at = std::time::Instant::now();
pending.current_backoff = self.base_timeout;
pending.reset_count += 1;
true
} else {
false
}
}

/// Get the reset count for a pending Car
pub fn reset_count(&self, car_hash: &Hash) -> Option<u32> {
self.pending.get(car_hash).map(|p| p.reset_count)
}

/// Get current attestation count for a Car
pub fn attestation_count(&self, car_hash: &Hash) -> Option<usize> {
self.pending.get(car_hash).map(|p| p.attestations.len() + 1) // +1 for self
Expand Down
8 changes: 6 additions & 2 deletions crates/data-chain/src/primary/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ pub struct PrimaryConfig {

impl PrimaryConfig {
/// Create a new configuration with defaults
///
/// Default intervals are tuned for responsive transaction processing:
/// - `car_interval`: 50ms matches Worker flush interval for quick Car creation
/// - `max_empty_cars`: 1 reduces timing races where empty Cars compete with tx-bearing Cars
pub fn new(validator_id: ValidatorId, bls_secret_key: BlsSecretKey) -> Self {
Self {
validator_id,
bls_secret_key,
car_interval: Duration::from_millis(100),
car_interval: Duration::from_millis(50), // Faster Car creation
attestation_timeout_base: Duration::from_millis(500),
attestation_timeout_max: Duration::from_millis(5000),
max_empty_cars: 3,
max_empty_cars: 1, // Reduce empty Car spam during tx processing
worker_count: 1,
equivocation_retention: 1000,
startup_delay: Duration::from_secs(2),
Expand Down
134 changes: 121 additions & 13 deletions crates/data-chain/src/primary/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,15 @@ impl Primary {
"Checked for ready Cars after batch sync"
);
for car in ready_cars {
// Skip if this is our own Car (defensive check)
if car.proposer == self.config.validator_id {
trace!(
position = car.position,
"Skipping attestation for our own Car in batch sync"
);
continue;
}

// IMPORTANT: The Car was already validated when first received
// (position check, signature, parent_ref all passed). We queued
// it only because batches were missing. Now that batches are
Expand Down Expand Up @@ -624,6 +633,11 @@ impl Primary {
"Received consensus decision notification"
);

// CRITICAL: Update last_cut to the DECIDED cut (not what we proposed)
// This ensures monotonicity checks in form_cut use the actual consensus
// state, preventing stale proposed cuts from causing violations
self.last_cut = Some(cut.clone());

// CRITICAL: Sync position tracking from the decided Cut BEFORE advancing state
// This ensures validators that missed some CARs during collection still have
// consistent position tracking for subsequent heights
Expand Down Expand Up @@ -858,6 +872,15 @@ impl Primary {

/// Handle a received Car
async fn handle_received_car(&mut self, from: ValidatorId, car: Car) {
// Skip if this is our own Car (we handle our own Cars through attestation collection)
if car.proposer == self.config.validator_id {
trace!(
position = car.position,
"Ignoring received Car for our own Car"
);
return;
}

// DIAGNOSTIC: Log at INFO level for batched Cars to trace attestation flow
let batch_count = car.batch_digests.len();
if batch_count > 0 {
Expand Down Expand Up @@ -982,6 +1005,48 @@ impl Primary {
"Position gap detected, initiating gap recovery"
);

// CRITICAL FIX: Update position tracking when we're behind
//
// If actual > expected, we've fallen behind this validator's position.
// The Car signature was already validated by core.handle_car(), so we
// know this is a legitimate Car. Update tracking to prevent death spiral:
//
// Without this fix:
// 1. We miss some Cars from validator X
// 2. New Cars from X trigger position gap errors
// 3. We never attest to X's new Cars
// 4. X's Cars never reach quorum
// 5. Transactions stuck forever
//
// With this fix:
// - We update tracking to actual position
// - Next Car from X (at actual+1) will be attested normally
// - Network recovers quickly
if actual > expected {
let car_hash = car.hash();
info!(
proposer = %validator,
expected,
actual,
car_hash = %car_hash,
"Updating position tracking to recover from gap - signature was valid"
);
self.state.update_last_seen(validator, actual, car_hash);

// Also generate attestation since signature is valid and we're syncing
// This helps the network reach quorum faster
let attestation = self.core.create_attestation(&car);
DCL_ATTESTATIONS_SENT.inc();
self.network
.send_attestation(car.proposer, &attestation)
.await;
info!(
proposer = %validator,
position = actual,
"Generated attestation after position gap recovery"
);
}

// Queue the out-of-order Car for later processing
if !self.state.is_awaiting_gap_sync(&validator, actual) {
self.state.queue_car_awaiting_gap(car.clone(), expected);
Expand Down Expand Up @@ -1228,6 +1293,30 @@ impl Primary {
"Received valid CarWithAttestation from peer"
);

// CRITICAL FIX: Update position tracking from attested Car broadcasts
//
// Without this, validators can fall into a position gap death spiral:
// 1. Validator A's Cars don't reach quorum (for whatever reason)
// 2. Other validators' last_seen_positions[A] becomes stale
// 3. When A broadcasts new Cars, others detect a "position gap"
// 4. No attestations are generated, A's Cars never reach quorum
// 5. The gap grows forever
//
// By updating position tracking when we receive a valid CarWithAttestation
// (which has quorum verification), we stay in sync even if we missed
// some intermediate Cars. This breaks the death spiral.
let current_pos = self.state.last_seen_positions.get(&car.proposer).copied();
if current_pos.is_none_or(|p| car.position > p) {
self.state
.update_last_seen(car.proposer, car.position, car_hash);
info!(
proposer = %car.proposer,
old_position = current_pos,
new_position = car.position,
"Updated position tracking from CarWithAttestation broadcast"
);
}

// Persist attestation to storage if available
if let Some(ref storage) = self.storage {
if let Err(e) = storage.put_attestation(attestation.clone()).await {
Expand Down Expand Up @@ -1377,20 +1466,39 @@ impl Primary {
);
// Could re-broadcast Car here
} else {
// IMPORTANT: Don't timeout Cars with batches!
// Peers need extra time to sync batch data before they can attest.
// Without this, batched Cars timeout before peers finish syncing,
// causing attestations to be rejected with UnknownCar error.
// Max backoff exceeded
if has_batches {
// Reset the timeout without losing existing attestations
info!(
hash = %hash,
position = car.position,
batch_count = car.batch_digests.len(),
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
"Extending timeout for batched Car - peers may still be syncing"
);
self.attestation_collector.reset_timeout(&hash);
// Try to extend timeout for batched Cars that need more time.
// reset_timeout() returns false if max resets exceeded.
let reset_count = self.attestation_collector.reset_count(&hash).unwrap_or(0);
if self.attestation_collector.reset_timeout(&hash) {
info!(
hash = %hash,
position = car.position,
batch_count = car.batch_digests.len(),
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
reset_count = reset_count + 1,
"Extending timeout for batched Car - peers may still be syncing"
);
} else {
// Max resets exceeded - drop the Car and restore batches
warn!(
hash = %hash,
position = car.position,
batch_count = car.batch_digests.len(),
attestation_count = self.attestation_collector.attestation_count(&hash).unwrap_or(0),
reset_count,
"Batched Car exceeded max timeout resets - dropping and restoring batches"
);
self.attestation_collector.remove(&hash);
self.state.remove_pending_car(&hash);

// Restore batch digests to pending so they can be re-batched
// This ensures transactions are not lost
for digest in &car.batch_digests {
self.state.add_batch_digest(digest.clone());
}
}
} else {
warn!(
hash = %hash,
Expand Down
45 changes: 40 additions & 5 deletions crates/data-chain/src/primary/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,15 +397,50 @@ impl PrimaryState {
pub fn mark_attested(&mut self, car: Car, aggregated: AggregatedAttestation) {
let validator = car.proposer;
let new_has_batches = !car.batch_digests.is_empty();
let last_included = self
.last_included_positions
.get(&validator)
.copied()
.unwrap_or(0);

// CRITICAL: Never store a Car that has already been finalized
// This prevents monotonicity violations in Cut formation
if car.position <= last_included {
tracing::debug!(
validator = %validator,
car_position = car.position,
last_included,
"Rejecting attested Car - position already finalized"
);
return;
}

// Check if we should replace the existing attested Car
if let Some((existing_car, _)) = self.attested_cars.get(&validator) {
let existing_has_batches = !existing_car.batch_digests.is_empty();
let last_included = self
.last_included_positions
.get(&validator)
.copied()
.unwrap_or(0);

// CRITICAL: Never go backwards in position
// This prevents monotonicity violations in Cut formation
if car.position < existing_car.position {
tracing::debug!(
validator = %validator,
existing_position = existing_car.position,
new_position = car.position,
"Rejecting attested Car - would go backwards in position"
);
return;
}

// Don't replace a Car with batches with an empty Car at same position
if existing_has_batches && !new_has_batches && car.position == existing_car.position {
tracing::debug!(
validator = %validator,
position = car.position,
existing_batches = existing_car.batch_digests.len(),
"Preserving Car with batches over empty Car at same position"
);
return;
}

// Don't replace a Car with batches with an empty Car, UNLESS the
// existing Car's position has already been included in a decided Cut
Expand Down
10 changes: 7 additions & 3 deletions crates/data-chain/src/worker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ pub struct WorkerConfig {

impl WorkerConfig {
/// Create a new configuration with defaults
///
/// Default batch thresholds are tuned for responsive transaction processing:
/// - `max_batch_txs`: 100 transactions triggers immediate batch flush
/// - `flush_interval`: 50ms ensures batches don't wait too long
pub fn new(validator_id: ValidatorId, worker_id: u8) -> Self {
Self {
validator_id,
worker_id,
max_batch_bytes: 1024 * 1024, // 1MB
max_batch_txs: 1000,
flush_interval: Duration::from_millis(100),
max_batch_bytes: 1024 * 1024, // 1MB
max_batch_txs: 100, // Flush after 100 txs for responsive batching
flush_interval: Duration::from_millis(50), // Faster time-based flush
}
}

Expand Down
Loading