diff --git a/node/src/actors/chain_manager/handlers.rs b/node/src/actors/chain_manager/handlers.rs index 92596014d..90e31d2eb 100644 --- a/node/src/actors/chain_manager/handlers.rs +++ b/node/src/actors/chain_manager/handlers.rs @@ -645,10 +645,10 @@ fn log_sync_progress( impl Handler for ChainManager { type Result = SessionUnitResult; - fn handle(&mut self, msg: AddCandidates, _ctx: &mut Context) { + fn handle(&mut self, msg: AddCandidates, ctx: &mut Context) { // AddCandidates is needed in all states - for block in msg.blocks { - self.process_candidate(block); + for (block, ts) in msg.blocks { + self.process_candidate(ctx, block, ts); } } } diff --git a/node/src/actors/chain_manager/mining.rs b/node/src/actors/chain_manager/mining.rs index 4356a4387..b60defd7b 100644 --- a/node/src/actors/chain_manager/mining.rs +++ b/node/src/actors/chain_manager/mining.rs @@ -40,7 +40,7 @@ use witnet_data_structures::{ }; use witnet_futures_utils::TryFutureExt2; use witnet_rad::{error::RadError, types::serial_iter_decode}; -use witnet_util::timestamp::get_timestamp; +use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos}; use witnet_validations::validations::{ block_reward, calculate_liars_and_errors_count_from_tally, calculate_mining_probability, calculate_randpoe_threshold, calculate_reppoe_threshold, dr_transaction_fee, merkle_tree_root, @@ -250,7 +250,7 @@ impl ChainManager { beacon, epoch_constants, ) - .map_ok(|_diff, act, _ctx| { + .map_ok(|_diff, act, ctx| { // Send AddCandidates message to self // This will run all the validations again @@ -263,7 +263,7 @@ impl ChainManager { Yellow.bold().paint(block_hash.to_string()) ); - act.process_candidate(block); + act.process_candidate(ctx, block, get_timestamp_nanos()); }) .map_err(|e, _, _| log::error!("Error trying to mine a block: {}", e)) }) diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index 3f71dc844..5bde26fee 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -68,7 +68,7 @@ use witnet_data_structures::{ }; use witnet_rad::types::RadonTypes; -use witnet_util::timestamp::seconds_to_human_string; +use witnet_util::timestamp::{duration_between_timestamps, seconds_to_human_string}; use witnet_validations::validations::{ compare_block_candidates, validate_block, validate_block_transactions, validate_new_transaction, validate_rad_request, verify_signatures, VrfSlots, @@ -515,7 +515,7 @@ impl ChainManager { } #[allow(clippy::map_entry)] - fn process_candidate(&mut self, block: Block) { + fn process_candidate(&mut self, ctx: &mut Context, block: Block, ts: (i64, u32)) { if let (Some(current_epoch), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = ( self.current_epoch, self.chain_state.chain_info.as_ref(), @@ -551,6 +551,20 @@ impl ChainManager { return; } + // Calculate delay for broadcasting blocks + let delay = if let Some(delay) = calculate_delay_for_broadcasting_block( + chain_info.consensus_constants.checkpoint_zero_timestamp, + chain_info.consensus_constants.checkpoints_period, + current_epoch, + ts, + ) { + delay + } else { + log::debug!("Block received too late to broadcasting"); + + return; + }; + let mut vrf_input = chain_info.highest_vrf_output; vrf_input.checkpoint = current_epoch; let active_wips = ActiveWips { @@ -582,7 +596,9 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - self.broadcast_item(InventoryItem::Block(block)); + ctx.run_later(delay, |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } return; @@ -646,7 +662,9 @@ impl ChainManager { vrf_proof, }); - self.broadcast_item(InventoryItem::Block(block)); + ctx.run_later(delay, |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } Err(e) => { log::warn!( @@ -658,7 +676,9 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - self.broadcast_item(InventoryItem::Block(block)); + ctx.run_later(delay, |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } } } @@ -2207,6 +2227,51 @@ impl ChainManager { } } +// Auxiliary function that converts one delay in another +fn delay_function(initial_delay: Duration) -> Duration { + // TODO: Apply a right delay function + // Remove 7.5 secs to the delay + initial_delay.saturating_sub(Duration::new(7, 500000000)) +} + +// Calculate the delay to introduce in block broadcasting +// Returns None in case of overflow the current epoch duration +#[allow(clippy::cast_sign_loss)] +fn calculate_delay_for_broadcasting_block( + checkpoint_zero_timestamp: i64, + checkpoints_period: u16, + current_epoch: Epoch, + ts: (i64, u32), +) -> Option { + let epoch_constants = EpochConstants { + checkpoint_zero_timestamp, + checkpoints_period, + }; + + // Calculate delay between mining block timestamp and another timestamp + let timestamp_mining = epoch_constants + .block_mining_timestamp(current_epoch) + .unwrap(); + let delay_from_mining_ts = duration_between_timestamps((timestamp_mining, 0), ts) + .unwrap_or_else(|| Duration::from_secs(0)); + + // Apply magic delay function + let delay_to_broadcasting = delay_function(delay_from_mining_ts); + + // Return delay only if is before the end of the epoch + let end_epoch_ts = epoch_constants + .epoch_timestamp(current_epoch + 1) + .unwrap_or(i64::MAX) as u64; + let ts_with_delay = Duration::new(ts.0 as u64, ts.1).checked_add(delay_to_broadcasting); + + match ts_with_delay { + Some(ts_with_delay) if ts_with_delay.as_secs() < end_epoch_ts => { + Some(delay_to_broadcasting) + } + _ => None, + } +} + /// Helper struct used to persist an old copy of the `ChainState` to the storage #[derive(Debug, Default)] struct ChainStateSnapshot { @@ -3528,19 +3593,19 @@ mod tests { assert_ne!(block_1, block_mal_1); // Process the modified candidate first - chain_manager.process_candidate(block_mal_1); + chain_manager.process_candidate(&mut Context::new(), block_mal_1, (0, 0)); // The best candidate should be None because this block is invalid let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, None); // Process candidate with the same hash, but this one is valid - chain_manager.process_candidate(block_1.clone()); + chain_manager.process_candidate(&mut Context::new(), block_1.clone(), (0, 0)); // The best candidate should be block_1 let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, Some(&block_1)); // Process another valid candidate, but worse than the other one - chain_manager.process_candidate(block_2); + chain_manager.process_candidate(&mut Context::new(), block_2, (0, 0)); // The best candidate should still be block_1 let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, Some(&block_1)); diff --git a/node/src/actors/json_rpc/json_rpc_methods.rs b/node/src/actors/json_rpc/json_rpc_methods.rs index e124c5f81..a6d0012aa 100644 --- a/node/src/actors/json_rpc/json_rpc_methods.rs +++ b/node/src/actors/json_rpc/json_rpc_methods.rs @@ -49,6 +49,7 @@ use crate::actors::messages::GetSupplyInfo; use futures::FutureExt; use futures_util::compat::Compat; use std::future::Future; +use witnet_util::timestamp::get_timestamp_nanos; type JsonRpcResult = Result; @@ -433,9 +434,10 @@ pub async fn inventory(params: Result) -> Js log::debug!("Got block from JSON-RPC. Sending AnnounceItems message."); let chain_manager_addr = ChainManager::from_registry(); + let now = get_timestamp_nanos(); let res = chain_manager_addr .send(AddCandidates { - blocks: vec![block], + blocks: vec![(block, now)], }) .await; diff --git a/node/src/actors/messages.rs b/node/src/actors/messages.rs index e4d9e3f71..5e04f06b1 100644 --- a/node/src/actors/messages.rs +++ b/node/src/actors/messages.rs @@ -84,7 +84,7 @@ impl Message for AddBlocks { /// Add a new candidate pub struct AddCandidates { /// Candidates - pub blocks: Vec, + pub blocks: Vec<(Block, (i64, u32))>, } impl Message for AddCandidates { diff --git a/node/src/actors/session/handlers.rs b/node/src/actors/session/handlers.rs index 0c4adee27..8e34ee3f6 100644 --- a/node/src/actors/session/handlers.rs +++ b/node/src/actors/session/handlers.rs @@ -39,7 +39,7 @@ use crate::actors::{ sessions_manager::SessionsManager, }; -use witnet_util::timestamp::get_timestamp; +use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos}; #[derive(Debug, Eq, Fail, PartialEq)] enum HandshakeError { @@ -710,10 +710,12 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context, b // requested_block_hashes is cleared by using drain(..) above } } else { + let ts = get_timestamp_nanos(); + // If this is not a requested block, assume it is a candidate // Send a message to the ChainManager to try to add a new candidate chain_manager_addr.do_send(AddCandidates { - blocks: vec![block], + blocks: vec![(block, ts)], }); } }