Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ use lightning::routing::utxo::UtxoLookup;
use lightning::sign::{
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
};
#[cfg(not(c_bindings))]
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
use lightning::util::persist::{
KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
Expand All @@ -63,6 +61,8 @@ use lightning::util::persist::{
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
#[cfg(not(c_bindings))]
use lightning::util::native_async::MaybeSend;
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
#[cfg(feature = "std")]
use lightning::util::wakers::Sleeper;
Expand Down
1 change: 1 addition & 0 deletions lightning-block-sync/src/async_poll.rs
10 changes: 7 additions & 3 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ pub trait UtxoSource: BlockSource + 'static {
pub struct TokioSpawner;
#[cfg(feature = "tokio")]
impl FutureSpawner for TokioSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
tokio::spawn(future);
type E = tokio::task::JoinError;
type SpawnedFutureResult<O> = tokio::task::JoinHandle<O>;
fn spawn<O: Send + 'static, F: Future<Output = O> + Send + 'static>(
&self, future: F,
) -> Self::SpawnedFutureResult<O> {
tokio::spawn(future)
}
}

Expand Down Expand Up @@ -280,7 +284,7 @@ where
let gossiper = Arc::clone(&self.gossiper);
let block_cache = Arc::clone(&self.block_cache);
let pmw = Arc::clone(&self.peer_manager_wake);
self.spawn.spawn(async move {
let _ = self.spawn.spawn(async move {
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
fut.resolve(gossiper.network_graph(), &*gossiper, res);
(pmw)();
Expand Down
224 changes: 84 additions & 140 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects
//! from disk.

use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
use crate::async_poll::{MultiResultFuturePoller, ResultFuture};
use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache};

use bitcoin::block::Header;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::Network;

use lightning::chain;
Expand All @@ -32,19 +32,18 @@ where
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
///
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
/// failure, each listener may be left at a different block hash than the one it was originally
/// paired with.
/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In
/// the case of failure, each listener may be left at a different block hash than the one it was
/// originally paired with.
///
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
/// switching to [`SpvClient`]. For example:
///
/// ```
/// use bitcoin::hash_types::BlockHash;
/// use bitcoin::network::Network;
///
/// use lightning::chain;
/// use lightning::chain::Watch;
/// use lightning::chain::{BestBlock, Watch};
/// use lightning::chain::chainmonitor;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor::ChannelMonitor;
Expand Down Expand Up @@ -89,14 +88,14 @@ where
/// logger: &L,
/// persister: &P,
/// ) {
/// // Read a serialized channel monitor paired with the block hash when it was persisted.
/// // Read a serialized channel monitor paired with the best block when it was persisted.
/// let serialized_monitor = "...";
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<SP::EcdsaSigner>)>::read(
/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor<SP::EcdsaSigner>)>::read(
/// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap();
///
/// // Read the channel manager paired with the block hash when it was persisted.
/// // Read the channel manager paired with the best block when it was persisted.
/// let serialized_manager = "...";
/// let (manager_block_hash, mut manager) = {
/// let (manager_best_block, mut manager) = {
/// let read_args = ChannelManagerReadArgs::new(
/// entropy_source,
/// node_signer,
Expand All @@ -110,19 +109,18 @@ where
/// config,
/// vec![&mut monitor],
/// );
/// <(BlockHash, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read(
/// <(BestBlock, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P, &ES>, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read(
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
/// };
///
/// // Synchronize any channel monitors and the channel manager to be on the best block.
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
/// let listeners = vec![
/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen),
/// (manager_block_hash, &manager as &dyn chain::Listen),
/// (monitor_best_block, &monitor_listener as &dyn chain::Listen),
/// (manager_best_block, &manager as &dyn chain::Listen),
/// ];
/// let chain_tip = init::synchronize_listeners(
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
/// let (chain_cache, chain_tip) = init::synchronize_listeners(
/// block_source, Network::Bitcoin, listeners).await.unwrap();
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0;
Expand All @@ -131,7 +129,7 @@ where
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
/// let mut chain_listener = (chain_monitor, &manager);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener);
/// }
/// ```
///
Expand All @@ -140,85 +138,87 @@ where
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub async fn synchronize_listeners<
B: Deref + Sized + Send + Sync,
C: Cache,
L: chain::Listen + ?Sized,
>(
block_source: B, network: Network, header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &L)>,
) -> BlockSourceResult<ValidatedBlockHeader>
block_source: B, network: Network,
mut chain_listeners: Vec<(BestBlock, &L)>,
) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)>
where
B::Target: BlockSource,
{
let best_header = validate_best_block_header(&*block_source).await?;

// Fetch the header for the block hash paired with each listener.
let mut chain_listeners_with_old_headers = Vec::new();
for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
let old_header = match header_cache.look_up(&old_block_hash) {
Some(header) => *header,
None => {
block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
},
};
chain_listeners_with_old_headers.push((old_header, chain_listener))
}

// Find differences and disconnect blocks for each listener individually.
let mut chain_poller = ChainPoller::new(block_source, network);
let mut chain_listeners_at_height = Vec::new();
let mut most_common_ancestor = None;
let mut most_connected_blocks = Vec::new();
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
let mut header_cache = HeaderCache::new();
for (old_best_block, chain_listener) in chain_listeners.drain(..) {
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
let header_cache = &mut ReadOnlyCache(header_cache);
let (common_ancestor, connected_blocks) = {
let chain_listener = &DynamicChainListener(chain_listener);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener };
let difference =
chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
chain_notifier.disconnect_blocks(difference.disconnected_blocks);
chain_notifier.find_difference_from_best_block(best_header, old_best_block, &mut chain_poller).await?;
if difference.common_ancestor.block_hash != old_best_block.block_hash {
chain_notifier.disconnect_blocks(difference.common_ancestor);
}
(difference.common_ancestor, difference.connected_blocks)
};

// Keep track of the most common ancestor and all blocks connected across all listeners.
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
if connected_blocks.len() > most_connected_blocks.len() {
most_common_ancestor = Some(common_ancestor);
most_connected_blocks = connected_blocks;
}
}

// Connect new blocks for all listeners at once to avoid re-fetching blocks.
if let Some(common_ancestor) = most_common_ancestor {
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
chain_notifier
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
.await
.map_err(|(e, _)| e)?;
}

Ok(best_header)
}

/// A wrapper to make a cache read-only.
///
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
/// listener.
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);
while !most_connected_blocks.is_empty() {
#[cfg(not(test))]
const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded
#[cfg(test)]
const MAX_BLOCKS_AT_ONCE: usize = 2;

let mut fetch_block_futures =
Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len()));
for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) {
let fetch_future = chain_poller.fetch_block(header);
fetch_block_futures.push(ResultFuture::Pending(Box::pin(async move {
(header, fetch_future.await)
})));
}
let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter();

impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.0.look_up(block_hash)
}
let mut fetched_blocks = [const { None }; MAX_BLOCKS_AT_ONCE];
for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) {
*result = Some((header.height, block_res?));
}
debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some()));
debug_assert!(fetched_blocks.is_sorted_by_key(|r| r.as_ref().map(|(height, _)| *height).unwrap_or(u32::MAX)));

for (listener_height, listener) in chain_listeners_at_height.iter() {
// Connect blocks for this listener.
for result in fetched_blocks.iter() {
if let Some((height, block_data)) = result {
if *height > *listener_height {
match &**block_data {
BlockData::FullBlock(block) => {
listener.block_connected(&block, *height);
},
BlockData::HeaderOnly(header_data) => {
listener.filtered_block_connected(&header_data, &[], *height);
},
}
}
}
}
}

fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
unreachable!()
most_connected_blocks
.truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE));
}

fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
None
}
Ok((header_cache, best_header))
}

/// Wrapper for supporting dynamically sized chain listeners.
Expand All @@ -236,33 +236,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L
}
}

/// A set of dynamically sized chain listeners, each paired with a starting block height.
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);

impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}

fn filtered_block_connected(
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.filtered_block_connected(header, txdata, height);
}
}
}

fn blocks_disconnected(&self, _fork_point: BestBlock) {
unreachable!()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -282,13 +255,12 @@ mod tests {
let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4));

let listeners = vec![
(chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen),
(chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen),
(chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen),
(chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen),
(chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen),
(chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen),
];
let mut cache = chain.header_cache(0..=4);
match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, chain.tip()),
match synchronize_listeners(&chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand All @@ -314,15 +286,12 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
cache.extend(fork_chain_3.header_cache(4..=4));
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, main_chain.tip()),
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand Down Expand Up @@ -351,37 +320,12 @@ mod tests {
.expect_block_connected(*main_chain.at_height(4));

let listeners = vec![
(fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen),
(fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen),
(fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen),
(fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen),
(fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen),
(fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen),
];
let mut cache = fork_chain_1.header_cache(2..=4);
cache.extend(fork_chain_2.header_cache(3..=4));
cache.extend(fork_chain_3.header_cache(4..=4));
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(header) => assert_eq!(header, main_chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}

#[tokio::test]
async fn cache_connected_and_keep_disconnected_blocks() {
let main_chain = Blockchain::default().with_height(2);
let fork_chain = main_chain.fork_at_height(1);
let new_tip = main_chain.tip();
let old_tip = fork_chain.tip();

let listener = MockChainListener::new()
.expect_blocks_disconnected(*fork_chain.at_height(1))
.expect_block_connected(*new_tip);

let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)];
let mut cache = fork_chain.header_cache(2..=2);
match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await {
Ok(_) => {
assert!(cache.contains_key(&new_tip.block_hash));
assert!(cache.contains_key(&old_tip.block_hash));
},
match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await {
Ok((_, header)) => assert_eq!(header, main_chain.tip()),
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
Expand Down
Loading