From 3c54e006ccd78505d6c89c044e4d1da451597b02 Mon Sep 17 00:00:00 2001 From: Luis Rubio Date: Wed, 17 Nov 2021 18:00:07 +0100 Subject: [PATCH 1/3] feat(node): include a delay in block broadcasting --- node/src/actors/chain_manager/handlers.rs | 6 +- node/src/actors/chain_manager/mining.rs | 6 +- node/src/actors/chain_manager/mod.rs | 67 +++++++++++++++++--- node/src/actors/json_rpc/json_rpc_methods.rs | 4 +- node/src/actors/messages.rs | 2 +- node/src/actors/session/handlers.rs | 6 +- 6 files changed, 73 insertions(+), 18 deletions(-) diff --git a/node/src/actors/chain_manager/handlers.rs b/node/src/actors/chain_manager/handlers.rs index 92596014d2..90e31d2eb2 100644 --- a/node/src/actors/chain_manager/handlers.rs +++ b/node/src/actors/chain_manager/handlers.rs @@ -645,10 +645,10 @@ fn log_sync_progress( impl Handler for ChainManager { type Result = SessionUnitResult; - fn handle(&mut self, msg: AddCandidates, _ctx: &mut Context) { + fn handle(&mut self, msg: AddCandidates, ctx: &mut Context) { // AddCandidates is needed in all states - for block in msg.blocks { - self.process_candidate(block); + for (block, ts) in msg.blocks { + self.process_candidate(ctx, block, ts); } } } diff --git a/node/src/actors/chain_manager/mining.rs b/node/src/actors/chain_manager/mining.rs index 4356a43879..b60defd7bb 100644 --- a/node/src/actors/chain_manager/mining.rs +++ b/node/src/actors/chain_manager/mining.rs @@ -40,7 +40,7 @@ use witnet_data_structures::{ }; use witnet_futures_utils::TryFutureExt2; use witnet_rad::{error::RadError, types::serial_iter_decode}; -use witnet_util::timestamp::get_timestamp; +use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos}; use witnet_validations::validations::{ block_reward, calculate_liars_and_errors_count_from_tally, calculate_mining_probability, calculate_randpoe_threshold, calculate_reppoe_threshold, dr_transaction_fee, merkle_tree_root, @@ -250,7 +250,7 @@ impl ChainManager { beacon, epoch_constants, ) - .map_ok(|_diff, act, _ctx| { + .map_ok(|_diff, act, ctx| { // Send AddCandidates message to self // This will run all the validations again @@ -263,7 +263,7 @@ impl ChainManager { Yellow.bold().paint(block_hash.to_string()) ); - act.process_candidate(block); + act.process_candidate(ctx, block, get_timestamp_nanos()); }) .map_err(|e, _, _| log::error!("Error trying to mine a block: {}", e)) }) diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index 3f71dc8441..5d472d7844 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -68,7 +68,7 @@ use witnet_data_structures::{ }; use witnet_rad::types::RadonTypes; -use witnet_util::timestamp::seconds_to_human_string; +use witnet_util::timestamp::{duration_between_timestamps, seconds_to_human_string}; use witnet_validations::validations::{ compare_block_candidates, validate_block, validate_block_transactions, validate_new_transaction, validate_rad_request, verify_signatures, VrfSlots, @@ -515,7 +515,7 @@ impl ChainManager { } #[allow(clippy::map_entry)] - fn process_candidate(&mut self, block: Block) { + fn process_candidate(&mut self, ctx: &mut Context, block: Block, ts: (i64, u32)) { if let (Some(current_epoch), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = ( self.current_epoch, self.chain_state.chain_info.as_ref(), @@ -582,7 +582,16 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - self.broadcast_item(InventoryItem::Block(block)); + let delay = calculate_delay_from_mining_timestamp( + chain_info.consensus_constants.checkpoint_zero_timestamp, + chain_info.consensus_constants.checkpoints_period, + current_epoch, + ts, + ); + + ctx.run_later(delay_function(delay), |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } return; @@ -646,7 +655,16 @@ impl ChainManager { vrf_proof, }); - self.broadcast_item(InventoryItem::Block(block)); + let delay = calculate_delay_from_mining_timestamp( + chain_info.consensus_constants.checkpoint_zero_timestamp, + chain_info.consensus_constants.checkpoints_period, + current_epoch, + ts, + ); + + ctx.run_later(delay_function(delay), |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } Err(e) => { log::warn!( @@ -658,7 +676,16 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - self.broadcast_item(InventoryItem::Block(block)); + let delay = calculate_delay_from_mining_timestamp( + chain_info.consensus_constants.checkpoint_zero_timestamp, + chain_info.consensus_constants.checkpoints_period, + current_epoch, + ts, + ); + + ctx.run_later(delay_function(delay), |act, _ctx| { + act.broadcast_item(InventoryItem::Block(block)) + }); } } } @@ -2207,6 +2234,30 @@ impl ChainManager { } } +// Calculate delay between mining block timestamp and another timestamp +fn calculate_delay_from_mining_timestamp( + checkpoint_zero_timestamp: i64, + checkpoints_period: u16, + current_epoch: Epoch, + ts: (i64, u32), +) -> Duration { + let epoch_constants = EpochConstants { + checkpoint_zero_timestamp, + checkpoints_period, + }; + let timestamp_mining = epoch_constants + .block_mining_timestamp(current_epoch) + .unwrap(); + + duration_between_timestamps((timestamp_mining, 0), ts).unwrap_or_else(|| Duration::from_secs(0)) +} + +fn delay_function(initial_delay: Duration) -> Duration { + // TODO: Apply a right delay function + // Direct delay + initial_delay +} + /// Helper struct used to persist an old copy of the `ChainState` to the storage #[derive(Debug, Default)] struct ChainStateSnapshot { @@ -3528,19 +3579,19 @@ mod tests { assert_ne!(block_1, block_mal_1); // Process the modified candidate first - chain_manager.process_candidate(block_mal_1); + chain_manager.process_candidate(&mut Context::new(), block_mal_1, (0, 0)); // The best candidate should be None because this block is invalid let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, None); // Process candidate with the same hash, but this one is valid - chain_manager.process_candidate(block_1.clone()); + chain_manager.process_candidate(&mut Context::new(), block_1.clone(), (0, 0)); // The best candidate should be block_1 let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, Some(&block_1)); // Process another valid candidate, but worse than the other one - chain_manager.process_candidate(block_2); + chain_manager.process_candidate(&mut Context::new(), block_2, (0, 0)); // The best candidate should still be block_1 let best_cand = chain_manager.best_candidate.as_ref().map(|bc| &bc.block); assert_eq!(best_cand, Some(&block_1)); diff --git a/node/src/actors/json_rpc/json_rpc_methods.rs b/node/src/actors/json_rpc/json_rpc_methods.rs index e124c5f814..a6d0012aaa 100644 --- a/node/src/actors/json_rpc/json_rpc_methods.rs +++ b/node/src/actors/json_rpc/json_rpc_methods.rs @@ -49,6 +49,7 @@ use crate::actors::messages::GetSupplyInfo; use futures::FutureExt; use futures_util::compat::Compat; use std::future::Future; +use witnet_util::timestamp::get_timestamp_nanos; type JsonRpcResult = Result; @@ -433,9 +434,10 @@ pub async fn inventory(params: Result) -> Js log::debug!("Got block from JSON-RPC. Sending AnnounceItems message."); let chain_manager_addr = ChainManager::from_registry(); + let now = get_timestamp_nanos(); let res = chain_manager_addr .send(AddCandidates { - blocks: vec![block], + blocks: vec![(block, now)], }) .await; diff --git a/node/src/actors/messages.rs b/node/src/actors/messages.rs index e4d9e3f714..5e04f06b1d 100644 --- a/node/src/actors/messages.rs +++ b/node/src/actors/messages.rs @@ -84,7 +84,7 @@ impl Message for AddBlocks { /// Add a new candidate pub struct AddCandidates { /// Candidates - pub blocks: Vec, + pub blocks: Vec<(Block, (i64, u32))>, } impl Message for AddCandidates { diff --git a/node/src/actors/session/handlers.rs b/node/src/actors/session/handlers.rs index 0c4adee27b..8e34ee3f6b 100644 --- a/node/src/actors/session/handlers.rs +++ b/node/src/actors/session/handlers.rs @@ -39,7 +39,7 @@ use crate::actors::{ sessions_manager::SessionsManager, }; -use witnet_util::timestamp::get_timestamp; +use witnet_util::timestamp::{get_timestamp, get_timestamp_nanos}; #[derive(Debug, Eq, Fail, PartialEq)] enum HandshakeError { @@ -710,10 +710,12 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context, b // requested_block_hashes is cleared by using drain(..) above } } else { + let ts = get_timestamp_nanos(); + // If this is not a requested block, assume it is a candidate // Send a message to the ChainManager to try to add a new candidate chain_manager_addr.do_send(AddCandidates { - blocks: vec![block], + blocks: vec![(block, ts)], }); } } From 5037dc248168c2c0d1d55534970ef99ab18115e6 Mon Sep 17 00:00:00 2001 From: Luis Rubio Date: Fri, 19 Nov 2021 18:07:52 +0100 Subject: [PATCH 2/3] feat(node): avoid broadcasting if delay is too big --- node/src/actors/chain_manager/mod.rs | 80 ++++++++++++++++------------ 1 file changed, 47 insertions(+), 33 deletions(-) diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index 5d472d7844..dd03c05188 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -551,6 +551,20 @@ impl ChainManager { return; } + // Calculate delay for broadcasting blocks + let delay = if let Some(delay) = calculate_delay_for_broadcasting_block( + chain_info.consensus_constants.checkpoint_zero_timestamp, + chain_info.consensus_constants.checkpoints_period, + current_epoch, + ts, + ) { + delay + } else { + log::debug!("Block received too late to broadcasting"); + + return; + }; + let mut vrf_input = chain_info.highest_vrf_output; vrf_input.checkpoint = current_epoch; let active_wips = ActiveWips { @@ -582,14 +596,7 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - let delay = calculate_delay_from_mining_timestamp( - chain_info.consensus_constants.checkpoint_zero_timestamp, - chain_info.consensus_constants.checkpoints_period, - current_epoch, - ts, - ); - - ctx.run_later(delay_function(delay), |act, _ctx| { + ctx.run_later(delay, |act, _ctx| { act.broadcast_item(InventoryItem::Block(block)) }); } @@ -655,14 +662,7 @@ impl ChainManager { vrf_proof, }); - let delay = calculate_delay_from_mining_timestamp( - chain_info.consensus_constants.checkpoint_zero_timestamp, - chain_info.consensus_constants.checkpoints_period, - current_epoch, - ts, - ); - - ctx.run_later(delay_function(delay), |act, _ctx| { + ctx.run_later(delay, |act, _ctx| { act.broadcast_item(InventoryItem::Block(block)) }); } @@ -676,14 +676,7 @@ impl ChainManager { // In order to do not block possible validate candidates in AlmostSynced // state, we would broadcast the errors too if self.sm_state == StateMachine::AlmostSynced { - let delay = calculate_delay_from_mining_timestamp( - chain_info.consensus_constants.checkpoint_zero_timestamp, - chain_info.consensus_constants.checkpoints_period, - current_epoch, - ts, - ); - - ctx.run_later(delay_function(delay), |act, _ctx| { + ctx.run_later(delay, |act, _ctx| { act.broadcast_item(InventoryItem::Block(block)) }); } @@ -2234,28 +2227,49 @@ impl ChainManager { } } -// Calculate delay between mining block timestamp and another timestamp -fn calculate_delay_from_mining_timestamp( +// Auxiliary function that converts one delay in another +fn delay_function(initial_delay: Duration) -> Duration { + // TODO: Apply a right delay function + // Direct delay + initial_delay +} + +// Calculate the delay to introduce in block broadcasting +// Returns None in case of overflow the current epoch duration +#[allow(clippy::cast_sign_loss)] +fn calculate_delay_for_broadcasting_block( checkpoint_zero_timestamp: i64, checkpoints_period: u16, current_epoch: Epoch, ts: (i64, u32), -) -> Duration { +) -> Option { let epoch_constants = EpochConstants { checkpoint_zero_timestamp, checkpoints_period, }; + + // Calculate delay between mining block timestamp and another timestamp let timestamp_mining = epoch_constants .block_mining_timestamp(current_epoch) .unwrap(); + let delay_from_mining_ts = duration_between_timestamps((timestamp_mining, 0), ts) + .unwrap_or_else(|| Duration::from_secs(0)); - duration_between_timestamps((timestamp_mining, 0), ts).unwrap_or_else(|| Duration::from_secs(0)) -} + // Apply magic delay function + let delay_to_broadcasting = delay_function(delay_from_mining_ts); -fn delay_function(initial_delay: Duration) -> Duration { - // TODO: Apply a right delay function - // Direct delay - initial_delay + // Return delay only if is before the end of the epoch + let end_epoch_ts = epoch_constants + .epoch_timestamp(current_epoch + 1) + .unwrap_or(i64::MAX) as u64; + let ts_with_delay = Duration::new(ts.0 as u64, ts.1).checked_add(delay_to_broadcasting); + + match ts_with_delay { + Some(ts_with_delay) if ts_with_delay.as_secs() < end_epoch_ts => { + Some(delay_to_broadcasting) + } + _ => None, + } } /// Helper struct used to persist an old copy of the `ChainState` to the storage From 260b7586c17172f92b1fc0613ca288728ef921be Mon Sep 17 00:00:00 2001 From: Luis Rubio Date: Mon, 22 Nov 2021 15:36:48 +0100 Subject: [PATCH 3/3] feat(node): apply a delay function --- node/src/actors/chain_manager/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/src/actors/chain_manager/mod.rs b/node/src/actors/chain_manager/mod.rs index dd03c05188..5bde26feeb 100644 --- a/node/src/actors/chain_manager/mod.rs +++ b/node/src/actors/chain_manager/mod.rs @@ -2230,8 +2230,8 @@ impl ChainManager { // Auxiliary function that converts one delay in another fn delay_function(initial_delay: Duration) -> Duration { // TODO: Apply a right delay function - // Direct delay - initial_delay + // Remove 7.5 secs to the delay + initial_delay.saturating_sub(Duration::new(7, 500000000)) } // Calculate the delay to introduce in block broadcasting