diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 4a182c33beb..a20c804506c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,8 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; -use lightning::chain::transaction::OutPoint; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -87,7 +86,6 @@ use lightning::util::wallet_utils::{WalletSourceSync, WalletSync}; use lightning_invoice::RawBolt11Invoice; use crate::utils::test_logger::{self, Output}; -use crate::utils::test_persister::TestPersister; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; @@ -293,144 +291,302 @@ impl Writer for VecWriter { } } -/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]` -/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass -/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by -/// storing both old `ChannelMonitor`s and ones that are "being persisted" here. +fn serialize_monitor(monitor: &ChannelMonitor) -> Vec { + let mut ser = VecWriter(Vec::new()); + monitor.write(&mut ser).unwrap(); + ser.0 +} + +/// LDK requires the `ChannelMonitor` loaded on startup to be at least as current as the +/// `ChannelManager` state, except for monitor updates that `ChannelManager` still records as +/// in-flight and can replay. This harness tracks the monitor blobs that remain valid restart +/// candidates under that rule. /// -/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will -/// simply be replayed on startup. +/// Separately, we track every `InProgress` persistence operation that still needs a +/// `channel_monitor_updated` call. A newer persisted monitor can make an older monitor invalid for +/// restart while the older update still needs to be completed to unblock the live `ChainMonitor`. +/// +/// Off-chain monitor updates that are still "being persisted" are stored in `ChannelManager` and +/// will be replayed on startup. Full-monitor snapshots from chain sync or archive paths that return +/// `InProgress` are only restart candidates; losing one on restart does not require a +/// `channel_monitor_updated` callback. struct LatestMonitorState { /// The latest monitor id which we told LDK we've persisted. /// - /// Note that there may still be earlier pending monitor updates in [`Self::pending_monitors`] - /// which we haven't yet completed. We're allowed to reload with those as well, at least until - /// they're completed. + /// Note that earlier updates may still need a `channel_monitor_updated` callback via + /// [`Self::pending_monitor_completions`]. persisted_monitor_id: u64, /// The latest serialized `ChannelMonitor` that we told LDK we persisted. persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", - /// from LDK's perspective. + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which remain safe to use as + /// stale monitors on reload. pending_monitors: Vec<(u64, Vec)>, + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which still need a + /// `channel_monitor_updated` callback. + pending_monitor_completions: Vec<(u64, Vec)>, } +impl LatestMonitorState { + fn insert_pending_entry( + pending: &mut Vec<(u64, Vec)>, monitor_id: u64, serialized_monitor: Vec, + ) { + // Monitor update ids must arrive in order. Assert at insertion time so duplicates or + // out-of-order updates fail close to the write that caused them instead of being sorted + // into place. + assert!( + pending.last().map_or(true, |(last_id, _)| *last_id < monitor_id), + "pending monitor updates should arrive in order" + ); + pending.push((monitor_id, serialized_monitor)); + } -struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, - pub persister: Arc, - pub chain_monitor: Arc< - chainmonitor::ChainMonitor< - TestChannelSigner, - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, - >, - pub latest_monitors: Mutex>, -} -impl TestChainMonitor { - pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, - ) -> Self { - Self { - chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster, - logger.clone(), - feeest, - Arc::clone(&persister), - Arc::clone(&keys), - keys.get_peer_storage_key(), - false, - )), - logger, - keys, - persister, - latest_monitors: Mutex::new(new_hash_map()), + fn insert_pending_monitor_candidate(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Full-monitor persists from chain sync or archive paths use the monitor's current + // latest_update_id rather than a fresh ChannelMonitorUpdate id. Keep duplicate ids so + // reload can choose between multiple same-id full snapshots that were in flight together. + if let Some((last_id, _)) = self.pending_monitors.last() { + assert!(*last_id <= monitor_id, "pending monitor updates should arrive in order"); } + self.pending_monitors.push((monitor_id, serialized_monitor)); } -} -impl chain::Watch for TestChainMonitor { - fn watch_channel( - &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, - ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); - let monitor_id = monitor.get_latest_update_id(); - let res = self.chain_monitor.watch_channel(channel_id, monitor); - let state = match res { - Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, - pending_monitors: Vec::new(), - }, - Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], - }, - Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), - Err(()) => panic!(), - }; - if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() { - panic!("Already had monitor pre-watch_channel"); + + fn mark_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Once a monitor is durable, use it as the restart baseline and stop tracking candidates + // at or behind that update id. Completion obligations are tracked separately and are + // deliberately not pruned here. + self.pending_monitors.retain(|(id, _)| *id > monitor_id); + if monitor_id >= self.persisted_monitor_id { + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; } - res } - fn update_channel( - &self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate, - ) -> chain::ChannelMonitorUpdateStatus { - let mut map_lock = self.latest_monitors.lock().unwrap(); - let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockLocator, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) - .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); - let res = self.chain_monitor.update_channel(channel_id, update); - match res { - chain::ChannelMonitorUpdateStatus::Completed => { - map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; + fn insert_pending( + &mut self, monitor_id: u64, serialized_monitor: Vec, needs_completion: bool, + ) { + if needs_completion { + // persist_new_channel and update_persisted_channel(Some(_)) require a later + // channel_monitor_updated callback if persistence returns InProgress. + Self::insert_pending_entry( + &mut self.pending_monitors, + monitor_id, + serialized_monitor.clone(), + ); + Self::insert_pending_entry( + &mut self.pending_monitor_completions, + monitor_id, + serialized_monitor, + ); + } else { + // This harness treats update_persisted_channel(None, ...) as the chain-sync/archive + // case: the full monitor may be used on restart, but ChainMonitor does not wait for a + // channel_monitor_updated callback. + self.insert_pending_monitor_candidate(monitor_id, serialized_monitor); + } + } + + fn mark_completed_update_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // The selector/drain path should already have removed this entry before + // finish_monitor_update calls channel_monitor_updated. This check catches accidental + // double-completion or pruning of the wrong list. + assert!( + self.pending_monitor_completions.iter().all(|(id, _)| *id != monitor_id), + "completed monitor update should already be removed from the completion queue" + ); + self.mark_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_completions(&mut self) -> Vec<(u64, Vec)> { + std::mem::take(&mut self.pending_monitor_completions) + } + + fn take_pending_completion( + &mut self, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + // The fuzzer chooses which outstanding callback to deliver. These choices apply to + // completion obligations, not to the set of monitors that may be used on restart. + match selector { + MonitorUpdateSelector::First => { + if self.pending_monitor_completions.is_empty() { + None + } else { + Some(self.pending_monitor_completions.remove(0)) + } }, - chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); + MonitorUpdateSelector::Second => { + if self.pending_monitor_completions.len() > 1 { + Some(self.pending_monitor_completions.remove(1)) + } else { + None + } }, - chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), + MonitorUpdateSelector::Last => self.pending_monitor_completions.pop(), } - res } - fn release_pending_monitor_events( - &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - return self.chain_monitor.release_pending_monitor_events(); + fn select_monitor_for_reload(&mut self, selector: MonitorReloadSelector) { + // A restart can load the last monitor we told LDK was persisted, or a monitor snapshot + // whose write was started before the simulated crash. + let old_mon = (self.persisted_monitor_id, std::mem::take(&mut self.persisted_monitor)); + let (monitor_id, serialized_monitor) = match selector { + MonitorReloadSelector::Persisted => old_mon, + MonitorReloadSelector::FirstPending => { + if self.pending_monitors.is_empty() { + old_mon + } else { + self.pending_monitors.remove(0) + } + }, + MonitorReloadSelector::LastPending => self.pending_monitors.pop().unwrap_or(old_mon), + }; + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; + // After restart, stop tracking pre-restart in-flight writes. ChannelManager will replay + // off-chain monitor updates that still matter; full-monitor snapshots may simply be absent. + self.pending_monitors.clear(); + self.pending_monitor_completions.clear(); } } +struct HarnessPersister { + pub update_ret: Mutex, + pub latest_monitors: Mutex>, +} +impl HarnessPersister { + fn track_monitor_update( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + status: chain::ChannelMonitorUpdateStatus, needs_completion: bool, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + if let Some(state) = latest_monitors.get_mut(&channel_id) { + match status { + chain::ChannelMonitorUpdateStatus::Completed => { + // A completed write advances the restart baseline. Once LDK can rely on that + // monitor state being durable, the harness stops offering candidates at or + // behind that update id. + state.mark_persisted(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // InProgress always creates a restart candidate, but only some calls also need + // an explicit channel_monitor_updated completion. + state.insert_pending(monitor_id, serialized_monitor, needs_completion); + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => {}, + } + } else { + let state = match status { + chain::ChannelMonitorUpdateStatus::Completed => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: serialized_monitor, + pending_monitors: Vec::new(), + pending_monitor_completions: Vec::new(), + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // The first persist for a channel is persist_new_channel, which always needs a + // completion callback when it returns InProgress. A full-monitor update without + // existing state would mean the harness missed the channel's initial monitor. + assert!(needs_completion, "missing monitor state for full monitor update"); + LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: Vec::new(), + pending_monitors: vec![(monitor_id, serialized_monitor.clone())], + pending_monitor_completions: vec![(monitor_id, serialized_monitor)], + } + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => return, + }; + assert!( + latest_monitors.insert(channel_id, state).is_none(), + "Already had monitor state pre-persist" + ); + } + } + + fn mark_update_completed( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + let state = latest_monitors + .get_mut(&channel_id) + .expect("missing monitor state for completed update"); + // Once we tell LDK update N is completed, use the completed monitor as the restart + // baseline and drop restart candidates at or behind N. + state.mark_completed_update_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_updates(&self, channel_id: &ChannelId) -> Vec<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .map_or_else(Vec::new, |state| state.drain_pending_completions()) + } + + fn drain_all_pending_updates(&self) -> Vec<(ChannelId, u64, Vec)> { + let mut completed_updates = Vec::new(); + for (channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + for (monitor_id, data) in state.drain_pending_completions() { + completed_updates.push((*channel_id, monitor_id, data)); + } + } + completed_updates + } + + fn take_pending_update( + &self, channel_id: &ChannelId, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .and_then(|state| state.take_pending_completion(selector)) + } +} +impl chainmonitor::Persist for HarnessPersister { + fn persist_new_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = data.get_latest_update_id(); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status, true); + status + } + + fn update_persisted_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + update: Option<&channelmonitor::ChannelMonitorUpdate>, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = update.map_or_else(|| data.get_latest_update_id(), |upd| upd.update_id); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update( + data.channel_id(), + monitor_id, + serialized_monitor, + status, + // `None` normally comes from chain-sync or archive writes, which need no completion + // callback. `update_channel_internal` can also use `None` after `update_monitor` + // fails, but this harness does not model that error-recovery path. + update.is_some(), + ); + status + } + + fn archive_persisted_channel(&self, _monitor_name: lightning::util::persist::MonitorName) {} +} + +type TestChainMonitor = chainmonitor::ChainMonitor< + TestChannelSigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + struct KeyProvider { node_secret: SecretKey, rand_bytes_id: atomic::AtomicU32, @@ -654,12 +810,14 @@ struct HarnessNode<'a> { node_id: u8, node: ChanMan<'a>, monitor: Arc, + persister: Arc, keys_manager: Arc, logger: Arc, broadcaster: Arc, fee_estimator: Arc, wallet: TestWalletSource, persistence_style: ChannelMonitorUpdateStatus, + deferred: bool, serialized_manager: Vec, height: u32, last_htlc_clear_fee: u32, @@ -674,35 +832,42 @@ impl<'a> std::ops::Deref for HarnessNode<'a> { } impl<'a> HarnessNode<'a> { - fn build_loggers( + fn build_logger( node_id: u8, out: &Out, - ) -> (Arc, Arc) { - let raw_logger = Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); - let logger_for_monitor: Arc = raw_logger.clone(); - let logger: Arc = raw_logger; - (logger_for_monitor, logger) + ) -> Arc { + Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())) + } + + fn build_persister(persistence_style: ChannelMonitorUpdateStatus) -> Arc { + Arc::new(HarnessPersister { + update_ret: Mutex::new(persistence_style), + latest_monitors: Mutex::new(new_hash_map()), + }) } fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, - keys_manager: &Arc, logger_for_monitor: Arc, - persistence_style: ChannelMonitorUpdateStatus, + keys_manager: &Arc, logger: Arc, + persister: &Arc, deferred: bool, ) -> Arc { - Arc::new(TestChainMonitor::new( + Arc::new(chainmonitor::ChainMonitor::new( + None, Arc::clone(broadcaster), - logger_for_monitor, + logger, Arc::clone(fee_estimator), - Arc::new(TestPersister { update_ret: Mutex::new(persistence_style) }), + Arc::clone(persister), Arc::clone(keys_manager), + keys_manager.get_peer_storage_key(), + deferred, )) } fn new( node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, - out: &Out, router: &'a FuzzRouter, chan_type: ChanType, + deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { - let (logger_for_monitor, logger) = Self::build_loggers(node_id, out); + let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, node_id, @@ -713,12 +878,14 @@ impl<'a> HarnessNode<'a> { rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()), }); + let persister = Self::build_persister(persistence_style); let monitor = Self::build_chain_monitor( &broadcaster, &fee_estimator, &keys_manager, - logger_for_monitor, - persistence_style, + Arc::clone(&logger), + &persister, + deferred, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -741,12 +908,14 @@ impl<'a> HarnessNode<'a> { node_id, node, monitor, + persister, keys_manager, logger, broadcaster, fee_estimator, wallet, persistence_style, + deferred, serialized_manager: Vec::new(), height: 0, last_htlc_clear_fee: 253, @@ -754,67 +923,34 @@ impl<'a> HarnessNode<'a> { } fn set_persistence_style(&mut self, style: ChannelMonitorUpdateStatus) { + // Store the style for the next reload. The active persister is intentionally not changed + // in place. self.persistence_style = style; } - fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + fn finish_monitor_update(&self, chan_id: ChannelId, monitor_id: u64, data: Vec) { + self.monitor.channel_monitor_updated(chan_id, monitor_id).unwrap(); + self.persister.mark_update_completed(chan_id, monitor_id, data); + } + + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool { + let completed_updates = self.persister.drain_pending_updates(chan_id); + let completed_any = !completed_updates.is_empty(); + for (monitor_id, data) in completed_updates { + self.finish_monitor_update(*chan_id, monitor_id, data); } + completed_any } fn complete_all_pending_monitor_updates(&self) { - for (channel_id, state) in self.monitor.latest_monitors.lock().unwrap().iter_mut() { - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); - if id >= state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() { + self.finish_monitor_update(channel_id, monitor_id, data); } } fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - let update = match selector { - MonitorUpdateSelector::First => { - if state.pending_monitors.is_empty() { - None - } else { - Some(state.pending_monitors.remove(0)) - } - }, - MonitorUpdateSelector::Second => { - if state.pending_monitors.len() > 1 { - Some(state.pending_monitors.remove(1)) - } else { - None - } - }, - MonitorUpdateSelector::Last => state.pending_monitors.pop(), - }; - if let Some((id, data)) = update { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } @@ -836,9 +972,30 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) { + fn checkpoint_manager_persistence(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { + let pending_monitor_writes = self.monitor.pending_operation_count(); self.serialized_manager = self.node.encode(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } + true + } else { + assert_eq!(self.monitor.pending_operation_count(), 0); + false + } + } + + fn force_checkpoint_manager_persistence(&mut self) { + let pending_monitor_writes = self.monitor.pending_operation_count(); + self.serialized_manager = self.node.encode(); + self.node.get_and_clear_needs_persistence(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); } } @@ -942,50 +1099,38 @@ impl<'a> HarnessNode<'a> { fn reload( &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { - let (logger_for_monitor, logger) = Self::build_loggers(self.node_id, out); + let logger = Self::build_logger(self.node_id, out); + let persister = Self::build_persister(self.persistence_style); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, &self.keys_manager, - logger_for_monitor, - ChannelMonitorUpdateStatus::Completed, + Arc::clone(&logger), + &persister, + self.deferred, ); let mut monitors = new_hash_map(); let mut use_old_mons = use_old_mons; { - let mut old_monitors = self.monitor.latest_monitors.lock().unwrap(); + let mut old_monitors = self.persister.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { - let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { - // Reload with the oldest `ChannelMonitor` (the one that we already told - // `ChannelManager` we finished persisting). - (prev_state.persisted_monitor_id, prev_state.persisted_monitor) - } else if use_old_mons % 3 == 1 { - // Reload with the second-oldest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon) - } else { - // Reload with the newest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.pop().unwrap_or(old_mon) + let selector = match use_old_mons % 3 { + 0 => MonitorReloadSelector::Persisted, + 1 => MonitorReloadSelector::FirstPending, + _ => MonitorReloadSelector::LastPending, }; - // Use a different value of `use_old_mons` if we have another monitor - // (only for node B) by shifting `use_old_mons` one in base-3. + prev_state.select_monitor_for_reload(selector); + // Use a different trit for each monitor so one restart byte can vary the stale + // monitor depth across multiple monitors for the node. use_old_mons /= 3; let mon = <(BlockLocator, ChannelMonitor)>::read( - &mut &serialized_mon[..], + &mut &prev_state.persisted_monitor[..], (&*self.keys_manager, &*self.keys_manager), ) .expect("Failed to read monitor"); monitors.insert(channel_id, mon.1); - // Update the latest `ChannelMonitor` state to match what we just told LDK. - prev_state.persisted_monitor = serialized_mon; - prev_state.persisted_monitor_id = mon_id; - // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, - // considering them discarded. LDK should replay these for us as they're stored in - // the `ChannelManager`. - prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + persister.latest_monitors.lock().unwrap().insert(channel_id, prev_state); } } let mut monitor_refs = new_hash_map(); @@ -1009,19 +1154,32 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); + let expected_status = if self.deferred { + ChannelMonitorUpdateStatus::InProgress + } else { + self.persistence_style + }; for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status)); } - *chain_monitor.persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; + self.persister = persister; self.logger = logger; + // In deferred mode, the startup watch_channel registrations above queue monitor operations + // even if the reloaded ChannelManager does not need persistence. Always checkpoint here so + // those registrations can be flushed against the manager snapshot they belong to. + self.force_checkpoint_manager_persistence(); } } +#[derive(Copy, Clone)] +enum MonitorReloadSelector { + Persisted, + FirstPending, + LastPending, +} + #[derive(Copy, Clone)] enum MonitorUpdateSelector { First, @@ -1049,6 +1207,51 @@ struct EventQueues { cb: Vec, } +fn find_destination_node(nodes: &[HarnessNode<'_>; 3], node_id: &PublicKey) -> usize { + nodes + .iter() + .position(|node| node.get_our_node_id() == *node_id) + .expect("message destination should be a known harness node") +} + +fn directed_msg_destination(event: &MessageSendEvent) -> Option { + match event { + MessageSendEvent::UpdateHTLCs { ref node_id, .. } + | MessageSendEvent::SendRevokeAndACK { ref node_id, .. } + | MessageSendEvent::SendChannelReestablish { ref node_id, .. } + | MessageSendEvent::SendStfu { ref node_id, .. } + | MessageSendEvent::SendSpliceInit { ref node_id, .. } + | MessageSendEvent::SendSpliceAck { ref node_id, .. } + | MessageSendEvent::SendSpliceLocked { ref node_id, .. } + | MessageSendEvent::SendTxAddInput { ref node_id, .. } + | MessageSendEvent::SendTxAddOutput { ref node_id, .. } + | MessageSendEvent::SendTxRemoveInput { ref node_id, .. } + | MessageSendEvent::SendTxRemoveOutput { ref node_id, .. } + | MessageSendEvent::SendTxComplete { ref node_id, .. } + | MessageSendEvent::SendTxAbort { ref node_id, .. } + | MessageSendEvent::SendTxInitRbf { ref node_id, .. } + | MessageSendEvent::SendTxAckRbf { ref node_id, .. } + | MessageSendEvent::SendTxSignatures { ref node_id, .. } + | MessageSendEvent::SendChannelReady { ref node_id, .. } + | MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } + | MessageSendEvent::SendChannelUpdate { ref node_id, .. } + | MessageSendEvent::HandleError { ref node_id, .. } => Some(*node_id), + MessageSendEvent::BroadcastChannelUpdate { .. } + | MessageSendEvent::BroadcastChannelAnnouncement { .. } + | MessageSendEvent::BroadcastNodeAnnouncement { .. } => None, + _ => panic!("Unhandled message event {:?}", event), + } +} + +fn is_funding_lock_msg_event(event: &MessageSendEvent) -> bool { + matches!( + event, + MessageSendEvent::SendChannelReady { .. } + | MessageSendEvent::SendAnnouncementSignatures { .. } + | MessageSendEvent::BroadcastChannelAnnouncement { .. } + ) +} + impl EventQueues { fn new() -> Self { Self { ab: Vec::new(), ba: Vec::new(), bc: Vec::new(), cb: Vec::new() } @@ -1077,78 +1280,43 @@ impl EventQueues { } } - fn push_for_node(&mut self, node_idx: usize, event: MessageSendEvent) { - match node_idx { - 0 => self.ab.push(event), - 2 => self.cb.push(event), - _ => panic!("cannot directly queue messages for node {}", node_idx), - } - } - - fn extend_for_node>( - &mut self, node_idx: usize, events: I, - ) { - match node_idx { - 0 => self.ab.extend(events), - 2 => self.cb.extend(events), - _ => panic!("cannot directly queue messages for node {}", node_idx), - } - } - - fn route_from_middle<'a, I: IntoIterator>( - &mut self, excess_events: I, expect_drop_node: Option, nodes: &[HarnessNode<'a>; 3], + fn route_from_node<'a, I: IntoIterator>( + &mut self, source_idx: usize, events: I, expect_drop_node: Option, + nodes: &[HarnessNode<'a>; 3], ) { - // Push any events from Node B onto queues.ba and queues.bc. - let a_id = nodes[0].get_our_node_id(); let expect_drop_id = expect_drop_node.map(|id| nodes[id].get_our_node_id()); - for event in excess_events { - let push_a = match event { - MessageSendEvent::UpdateHTLCs { ref node_id, .. } - | MessageSendEvent::SendRevokeAndACK { ref node_id, .. } - | MessageSendEvent::SendChannelReestablish { ref node_id, .. } - | MessageSendEvent::SendStfu { ref node_id, .. } - | MessageSendEvent::SendSpliceInit { ref node_id, .. } - | MessageSendEvent::SendSpliceAck { ref node_id, .. } - | MessageSendEvent::SendSpliceLocked { ref node_id, .. } - | MessageSendEvent::SendTxAddInput { ref node_id, .. } - | MessageSendEvent::SendTxAddOutput { ref node_id, .. } - | MessageSendEvent::SendTxRemoveInput { ref node_id, .. } - | MessageSendEvent::SendTxRemoveOutput { ref node_id, .. } - | MessageSendEvent::SendTxComplete { ref node_id, .. } - | MessageSendEvent::SendTxAbort { ref node_id, .. } - | MessageSendEvent::SendTxInitRbf { ref node_id, .. } - | MessageSendEvent::SendTxAckRbf { ref node_id, .. } - | MessageSendEvent::SendTxSignatures { ref node_id, .. } - | MessageSendEvent::SendChannelUpdate { ref node_id, .. } => { - if Some(*node_id) == expect_drop_id { - panic!( - "peer_disconnected should drop msgs bound for the disconnected peer" - ); - } - *node_id == a_id - }, - MessageSendEvent::HandleError { ref action, ref node_id } => { - assert_action_timeout_awaiting_response(action); - if Some(*node_id) == expect_drop_id { - panic!( - "peer_disconnected should drop msgs bound for the disconnected peer" - ); - } - *node_id == a_id - }, - MessageSendEvent::SendChannelReady { .. } - | MessageSendEvent::SendAnnouncementSignatures { .. } - | MessageSendEvent::BroadcastChannelUpdate { .. } => continue, - _ => panic!("Unhandled message event {:?}", event), + for event in events { + if let MessageSendEvent::HandleError { ref action, .. } = event { + assert_action_timeout_awaiting_response(action); + } + let Some(node_id) = directed_msg_destination(&event) else { + continue; }; - if push_a { - self.ba.push(event); - } else { - self.bc.push(event); + if Some(node_id) == expect_drop_id { + panic!("peer_disconnected should drop msgs bound for the disconnected peer"); + } + let dest_idx = find_destination_node(nodes, &node_id); + match (source_idx, dest_idx) { + (0, 1) => self.ab.push(event), + (1, 0) => self.ba.push(event), + (1, 2) => self.bc.push(event), + (2, 1) => self.cb.push(event), + _ => panic!("unsupported message route {} -> {}", source_idx, dest_idx), } } } + fn route_pending_from_node( + &mut self, source_idx: usize, expect_drop_node: Option, nodes: &[HarnessNode<'_>; 3], + ) { + self.route_from_node( + source_idx, + nodes[source_idx].get_and_clear_pending_msg_events(), + expect_drop_node, + nodes, + ); + } + fn clear_link(&mut self, link: &PeerLink) { match (link.node_a, link.node_b) { (0, 1) | (1, 0) => { @@ -1166,42 +1334,12 @@ impl EventQueues { fn drain_on_disconnect(&mut self, edge_node: usize, nodes: &[HarnessNode<'_>; 3]) { match edge_node { 0 => { - for event in nodes[0].get_and_clear_pending_msg_events() { - match event { - MessageSendEvent::UpdateHTLCs { .. } => {}, - MessageSendEvent::SendRevokeAndACK { .. } => {}, - MessageSendEvent::SendChannelReestablish { .. } => {}, - MessageSendEvent::SendStfu { .. } => {}, - MessageSendEvent::SendChannelReady { .. } => {}, - MessageSendEvent::SendAnnouncementSignatures { .. } => {}, - MessageSendEvent::BroadcastChannelUpdate { .. } => {}, - MessageSendEvent::SendChannelUpdate { .. } => {}, - MessageSendEvent::HandleError { ref action, .. } => { - assert_action_timeout_awaiting_response(action); - }, - _ => panic!("Unhandled message event"), - } - } - self.route_from_middle(nodes[1].get_and_clear_pending_msg_events(), Some(0), nodes); + self.route_pending_from_node(0, Some(1), nodes); + self.route_pending_from_node(1, Some(0), nodes); }, 2 => { - for event in nodes[2].get_and_clear_pending_msg_events() { - match event { - MessageSendEvent::UpdateHTLCs { .. } => {}, - MessageSendEvent::SendRevokeAndACK { .. } => {}, - MessageSendEvent::SendChannelReestablish { .. } => {}, - MessageSendEvent::SendStfu { .. } => {}, - MessageSendEvent::SendChannelReady { .. } => {}, - MessageSendEvent::SendAnnouncementSignatures { .. } => {}, - MessageSendEvent::BroadcastChannelUpdate { .. } => {}, - MessageSendEvent::SendChannelUpdate { .. } => {}, - MessageSendEvent::HandleError { ref action, .. } => { - assert_action_timeout_awaiting_response(action); - }, - _ => panic!("Unhandled message event"), - } - } - self.route_from_middle(nodes[1].get_and_clear_pending_msg_events(), Some(2), nodes); + self.route_pending_from_node(2, Some(1), nodes); + self.route_pending_from_node(1, Some(2), nodes); }, _ => panic!("unsupported disconnected edge"), } @@ -1233,11 +1371,13 @@ impl PeerLink { || (self.node_a == node_b && self.node_b == node_a) } - fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) { + fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool { + let mut completed_updates = false; for id in &self.channel_ids { - nodes[self.node_a].complete_all_monitor_updates(id); - nodes[self.node_b].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id); } + completed_updates } fn complete_monitor_updates_for_node( @@ -1269,7 +1409,7 @@ impl PeerLink { queues.clear_link(self); } - fn reconnect(&mut self, nodes: &[HarnessNode<'_>; 3]) { + fn reconnect(&mut self, nodes: &[HarnessNode<'_>; 3], queues: &mut EventQueues) { if !self.disconnected { return; } @@ -1288,6 +1428,8 @@ impl PeerLink { }; nodes[self.node_b].peer_connected(node_a_id, &init_a, false).unwrap(); self.disconnected = false; + queues.route_pending_from_node(self.node_a, None, nodes); + queues.route_pending_from_node(self.node_b, None, nodes); } fn disconnect_for_reload( @@ -1303,15 +1445,7 @@ impl PeerLink { nodes[remaining_node].peer_disconnected(restarted_node_id); self.disconnected = true; - if remaining_node == 1 { - queues.route_from_middle( - nodes[1].get_and_clear_pending_msg_events(), - Some(restarted_node), - nodes, - ); - } else { - nodes[remaining_node].get_and_clear_pending_msg_events(); - } + queues.route_pending_from_node(remaining_node, Some(restarted_node), nodes); queues.clear_link(self); } } @@ -1769,6 +1903,7 @@ fn build_node_config(chan_type: ChanType) -> UserConfig { let mut config = UserConfig::default(); config.channel_config.forwarding_fee_proportional_millionths = 0; config.channel_handshake_config.announce_for_forwarding = true; + config.channel_handshake_limits.force_announced_channel_preference = false; config.reject_inbound_splices = false; match chan_type { ChanType::Legacy => { @@ -1808,9 +1943,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { } fn make_channel( - source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool, - trusted_accept: bool, chain_state: &mut ChainState, -) { + nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, + trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, +) -> ChannelId { + assert!(source_idx < dest_idx); + let (left, right) = nodes.split_at_mut(dest_idx); + let (source, dest) = (&mut left[source_idx], &mut right[0]); if trusted_open { source .create_channel_to_trusted_peer_0reserve( @@ -1921,7 +2059,8 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor updates for dest after watch_channel. + dest.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -1942,7 +2081,8 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor updates for source after watch_channel. + source.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -1958,36 +2098,7 @@ fn make_channel( } else { panic!("Wrong event type"); } -} - -fn lock_fundings(nodes: &[HarnessNode<'_>; 3]) { - let mut node_events = Vec::new(); - for node in nodes.iter() { - node_events.push(node.get_and_clear_pending_msg_events()); - } - for (idx, node_event) in node_events.iter().enumerate() { - for event in node_event { - if let MessageSendEvent::SendChannelReady { ref node_id, ref msg } = event { - for node in nodes.iter() { - if node.get_our_node_id() == *node_id { - node.handle_channel_ready(nodes[idx].get_our_node_id(), msg); - } - } - } else { - panic!("Wrong event type"); - } - } - } - - for node in nodes.iter() { - let events = node.get_and_clear_pending_msg_events(); - for event in events { - if let MessageSendEvent::SendAnnouncementSignatures { .. } = event { - } else { - panic!("Wrong event type"); - } - } - } + channel_id } impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { @@ -2014,6 +2125,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; + let deferred = [ + config_byte & 0b0010_0000 != 0, + config_byte & 0b0100_0000 != 0, + config_byte & 0b1000_0000 != 0, + ]; let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); @@ -2051,6 +2167,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], + deferred[0], &out, router, chan_type, @@ -2061,6 +2178,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], + deferred[1], &out, router, chan_type, @@ -2071,6 +2189,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], + deferred[2], &out, router, chan_type, @@ -2088,14 +2207,18 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state); + let chan_ab_ids = [ + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state), + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state), + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state), + ]; // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state); + let chan_bc_ids = [ + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state), + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state), + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state), + ]; // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2108,24 +2231,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { nodes[1].sync_with_chain_state(&chain_state, None); nodes[2].sync_with_chain_state(&chain_state, None); - lock_fundings(&nodes); - - let chan_ab_ids = { - // Get channel IDs for all A-B channels (from node A's perspective). - let node_a_chans = nodes[0].list_usable_channels(); - [node_a_chans[0].channel_id, node_a_chans[1].channel_id, node_a_chans[2].channel_id] - }; - let chan_bc_ids = { - // Get channel IDs for all B-C channels (from node C's perspective). - let node_c_chans = nodes[2].list_usable_channels(); - [node_c_chans[0].channel_id, node_c_chans[1].channel_id, node_c_chans[2].channel_id] - }; - - for node in &mut nodes { - node.serialized_manager = node.encode(); - } - - Self { + let mut harness = Self { out, chan_type, chain_state, @@ -2134,7 +2240,33 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { bc_link: PeerLink::new(1, 2, chan_bc_ids), queues: EventQueues::new(), payments: PaymentTracker::new(), + }; + + harness.lock_fundings(); + + for node in &mut harness.nodes { + node.force_checkpoint_manager_persistence(); } + + harness + } + + fn lock_fundings(&mut self) { + for _ in 0..10 { + let mut had_events = false; + for node_idx in 0..self.nodes.len() { + had_events |= self.process_msg_events_matching( + node_idx, + false, + ProcessMessages::AllMessages, + Some(is_funding_lock_msg_event), + ); + } + if !had_events { + return; + } + } + panic!("lock_fundings did not settle"); } fn chan_a_id(&self) -> ChannelId { @@ -2267,16 +2399,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } } - fn process_msg_events( + fn process_msg_events_matching( &mut self, node_idx: usize, corrupt_forward: bool, limit_events: ProcessMessages, + expected_event: Option bool>, ) -> bool { - fn find_destination_node(nodes: &[HarnessNode<'_>; 3], node_id: &PublicKey) -> usize { - nodes - .iter() - .position(|node| node.get_our_node_id() == *node_id) - .expect("message destination should be a known harness node") - } - fn log_msg_delivery( node_idx: usize, dest_idx: usize, msg_name: &str, out: &Out, ) { @@ -2475,17 +2601,26 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { assert_action_timeout_awaiting_response(action); None }, - MessageSendEvent::SendChannelReady { .. } - | MessageSendEvent::SendAnnouncementSignatures { .. } - | MessageSendEvent::SendChannelUpdate { .. } => { - // Can be generated as a reestablish response. + MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { + let dest_idx = log_peer_message(node_idx, node_id, nodes, out, "channel_ready"); + nodes[dest_idx].handle_channel_ready(source_node_id, msg); None }, - MessageSendEvent::BroadcastChannelUpdate { .. } => { - // Can be generated as a result of calling `timer_tick_occurred` enough - // times while peers are disconnected. + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + let dest_idx = + log_peer_message(node_idx, node_id, nodes, out, "announcement_signatures"); + nodes[dest_idx].handle_announcement_signatures(source_node_id, msg); None }, + MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { + let dest_idx = + log_peer_message(node_idx, node_id, nodes, out, "channel_update"); + nodes[dest_idx].handle_channel_update(source_node_id, msg); + None + }, + MessageSendEvent::BroadcastChannelUpdate { .. } => None, + MessageSendEvent::BroadcastChannelAnnouncement { .. } => None, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => None, _ => panic!("Unhandled message event {:?}", event), } } @@ -2503,6 +2638,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { let mut events_iter = events.drain(..).chain(new_events.drain(..)); let mut extra_ev = None; for event in &mut events_iter { + if let Some(expected_event) = expected_event { + assert!( + expected_event(&event), + "Unexpected message event while locking fundings: {:?}", + event + ); + } had_events = true; extra_ev = process_msg_event( node_idx, @@ -2517,23 +2659,16 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { break; } } - if node_idx == 1 { - let remaining = extra_ev.into_iter().chain(events_iter).collect::>(); - queues.route_from_middle(remaining, None, nodes); - } else if node_idx == 0 { - if let Some(ev) = extra_ev { - queues.push_for_node(0, ev); - } - queues.extend_for_node(0, events_iter); - } else { - if let Some(ev) = extra_ev { - queues.push_for_node(2, ev); - } - queues.extend_for_node(2, events_iter); - } + queues.route_from_node(node_idx, extra_ev.into_iter().chain(events_iter), None, nodes); had_events } + fn process_msg_events( + &mut self, node_idx: usize, corrupt_forward: bool, limit_events: ProcessMessages, + ) -> bool { + self.process_msg_events_matching(node_idx, corrupt_forward, limit_events, None) + } + fn process_events(&mut self, node_idx: usize, fail: bool) -> bool { let nodes = &self.nodes; let chain_state = &mut self.chain_state; @@ -2542,7 +2677,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // claim/fail handling per event batch. let mut claim_set = new_hash_map(); let mut events = nodes[node_idx].get_and_clear_pending_events(); - let had_events = !events.is_empty(); + let mut had_events = !events.is_empty(); for event in events.drain(..) { match event { events::Event::PaymentClaimable { payment_hash, .. } => { @@ -2598,6 +2733,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } while nodes[node_idx].needs_pending_htlc_processing() { nodes[node_idx].process_pending_htlc_forwards(); + had_events = true; } had_events } @@ -2620,9 +2756,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - // Next, make sure no monitor updates are pending. - self.ab_link.complete_all_monitor_updates(&self.nodes); - self.bc_link.complete_all_monitor_updates(&self.nodes); + let mut made_progress = self.checkpoint_manager_persistences(); + // Next, make sure no monitor completion callbacks are pending. + made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. if self.process_msg_events(0, false, ProcessMessages::AllMessages) { last_pass_no_updates = false; @@ -2649,6 +2786,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { last_pass_no_updates = false; continue; } + if made_progress { + last_pass_no_updates = false; + continue; + } if last_pass_no_updates { // In some cases, we may generate a message to send in // `process_msg_events`, but block sending until @@ -2672,11 +2813,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } fn reconnect_ab(&mut self) { - self.ab_link.reconnect(&self.nodes); + self.ab_link.reconnect(&self.nodes, &mut self.queues); } fn reconnect_bc(&mut self) { - self.bc_link.reconnect(&self.nodes); + self.bc_link.reconnect(&self.nodes, &mut self.queues); } fn restart_node(&mut self, node_idx: usize, v: u8, router: &'a FuzzRouter) { @@ -2764,19 +2905,22 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) { + fn checkpoint_manager_persistences(&mut self) -> bool { + let mut made_progress = false; for node in &mut self.nodes { - node.refresh_serialized_manager(); + made_progress |= node.checkpoint_manager_persistence(); } + made_progress } } #[inline] pub fn do_test(data: &[u8], out: Out) { let router = FuzzRouter {}; - // Read initial monitor styles and channel type from fuzz input byte 0: + // Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0: // bits 0-2: monitor styles (1 bit per node) // bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments) + // bits 5-7: deferred monitor write mode (1 bit per node) let config_byte = if !data.is_empty() { data[0] } else { 0 }; let mut harness = Harness::new(config_byte, out, &router); let mut read_pos = 1; // First byte was consumed for initial config. @@ -3019,18 +3163,18 @@ pub fn do_test(data: &[u8], out: Out) { }, 0xb0 | 0xb1 | 0xb2 => { - // Restart node A, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node A, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(0, v, &router); }, 0xb3..=0xbb => { - // Restart node B, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node B, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(1, v, &router); }, 0xbc | 0xbd | 0xbe => { - // Restart node C, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node C, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(2, v, &router); }, @@ -3188,7 +3332,7 @@ pub fn do_test(data: &[u8], out: Out) { _ => break 'fuzz_loop, } - harness.refresh_serialized_managers(); + harness.checkpoint_manager_persistences(); } harness.finish(); } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 42d04e0f8ce..7ee9df1c30f 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -38,8 +38,8 @@ use crate::chain::chaininterface::{ }; use crate::chain::onchaintx::{ClaimEvent, FeerateStrategy, OnchainTxHandler}; use crate::chain::package::{ - CounterpartyOfferedHTLCOutput, CounterpartyReceivedHTLCOutput, HolderFundingOutput, - HolderHTLCOutput, PackageSolvingData, PackageTemplate, RevokedHTLCOutput, RevokedOutput, + ClaimRequest, CounterpartyOfferedHTLCOutput, CounterpartyReceivedHTLCOutput, + HolderFundingOutput, HolderHTLCOutput, PackageSolvingData, RevokedHTLCOutput, RevokedOutput, }; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{BlockLocator, WatchedOutput}; @@ -473,7 +473,8 @@ impl Readable for CounterpartyCommitmentParameters { /// An entry for an [`OnchainEvent`], stating the block height and hash when the event was /// observed, as well as the transaction causing it. /// -/// Used to determine when the on-chain event can be considered safe from a chain reorganization. +/// Used to determine when the on-chain event can be considered safe from a chain reorganization +/// or, for CSV-delayed outputs, spendable. #[derive(Clone, PartialEq, Eq)] struct OnchainEventEntry { txid: Txid, @@ -491,14 +492,13 @@ impl OnchainEventEntry { OnchainEvent::MaturingOutput { descriptor: SpendableOutputDescriptor::DelayedPaymentOutput(ref descriptor) } => { - // A CSV'd transaction is confirmable in block (input height) + CSV delay, which means - // it's broadcastable when we see the previous block. + // A CSV-delayed output is spendable in block (input height) + CSV delay, which + // means we can hand it upstream when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + descriptor.to_self_delay as u32 - 1); }, - OnchainEvent::FundingSpendConfirmation { on_local_output_csv: Some(csv), .. } | - OnchainEvent::HTLCSpendConfirmation { on_to_local_output_csv: Some(csv), .. } => { - // A CSV'd transaction is confirmable in block (input height) + CSV delay, which means - // it's broadcastable when we see the previous block. + OnchainEvent::FundingSpendConfirmation { on_local_output_csv: Some(csv), .. } => { + // A CSV-delayed output is spendable in block (input height) + CSV delay, which + // means we can act on the event when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + csv as u32 - 1); }, _ => {}, @@ -517,7 +517,7 @@ impl OnchainEventEntry { type CommitmentTxCounterpartyOutputInfo = Option<(u32, Amount)>; /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it -/// once they mature to enough confirmations (ANTI_REORG_DELAY) +/// once they reach anti-reorg finality or, for CSV-delayed outputs, CSV maturity. #[derive(Clone, PartialEq, Eq)] enum OnchainEvent { /// An outbound HTLC failing after a transaction is confirmed. Used @@ -534,8 +534,8 @@ enum OnchainEvent { /// transaction which appeared on chain. commitment_tx_output_idx: Option, }, - /// An output waiting on [`ANTI_REORG_DELAY`] confirmations before we hand the user the - /// [`SpendableOutputDescriptor`]. + /// An output waiting until it is anti-reorg final and, for CSV-delayed outputs, spendable + /// before we hand the user the [`SpendableOutputDescriptor`]. MaturingOutput { descriptor: SpendableOutputDescriptor }, /// A spend of the funding output, either a commitment transaction or a cooperative closing /// transaction. @@ -566,8 +566,9 @@ enum OnchainEvent { /// If the claim was made by either party with a preimage, this is filled in preimage: Option, /// If the claim was made by us on an inbound HTLC against a local commitment transaction, - /// we set this to the output CSV value which we will have to wait until to spend the - /// output (and generate a SpendableOutput event). + /// this records the CSV delay for the delayed output. The CSV-mature output remains + /// tracked via the corresponding [`OnchainEvent::MaturingOutput`]; the HTLC spend itself + /// reaches anti-reorg finality. on_to_local_output_csv: Option, }, /// An alternative funding transaction (due to a splice/RBF) has confirmed but can no longer be @@ -1003,7 +1004,7 @@ impl Balance { } } -/// An HTLC which has been irrevocably resolved on-chain, and has reached ANTI_REORG_DELAY. +/// An HTLC whose on-chain outcome has reached the threshold for irrevocable resolution. #[derive(Clone, PartialEq, Eq)] struct IrrevocablyResolvedHTLC { commitment_tx_output_idx: Option, @@ -1301,8 +1302,9 @@ pub(crate) struct ChannelMonitorImpl { pub(super) is_processing_pending_events: bool, // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on - // which to take actions once they reach enough confirmations. Each entry includes the - // transaction's id and the height when the transaction was confirmed on chain. + // which to take actions once they reach anti-reorg finality or, for CSV-delayed outputs, + // CSV maturity. Each entry includes the transaction's id and the height when the transaction + // was confirmed on chain. onchain_events_awaiting_threshold_conf: Vec, // If we get serialized out and re-read, we need to make sure that the chain monitoring @@ -1339,14 +1341,15 @@ pub(crate) struct ChannelMonitorImpl { /// Added in 0.0.124. holder_pays_commitment_tx_fee: Option, - /// Set to `Some` of the confirmed transaction spending the funding input of the channel after - /// reaching `ANTI_REORG_DELAY` confirmations. + /// Set to `Some` once the confirmed transaction spending the funding input of the channel has + /// reached its event threshold. funding_spend_confirmed: Option, confirmed_commitment_tx_counterparty_output: CommitmentTxCounterpartyOutputInfo, - /// The set of HTLCs which have been either claimed or failed on chain and have reached - /// the requisite confirmations on the claim/fail transaction (either ANTI_REORG_DELAY or the - /// spending CSV for revocable outputs). + /// The set of HTLCs whose on-chain claim or fail outcome is irrevocably resolved because the + /// commitment transaction HTLC output spend has reached anti-reorg finality. Any resulting + /// output that is still waiting on CSV maturity is tracked separately as an + /// [`OnchainEvent::MaturingOutput`]. htlcs_resolved_on_chain: Vec, /// When a payment is resolved through an on-chain transaction, we tell the `ChannelManager` @@ -2763,11 +2766,10 @@ impl ChannelMonitorImpl { source: BalanceSource::Htlc, }); } else if htlc_resolved && !htlc_output_spend_pending { - // Funding transaction spends should be fully confirmed by the time any - // HTLC transactions are resolved, unless we're talking about a holder - // commitment tx, whose resolution is delayed until the CSV timeout is - // reached, even though HTLCs may be resolved after only - // ANTI_REORG_DELAY confirmations. + // Funding transaction spends should have reached their event threshold by the time any + // HTLC transactions are irrevocably resolved, unless we're talking about a holder + // commitment tx, whose resolution is delayed until CSV maturity, even though HTLCs + // may be resolved after anti-reorg finality. debug_assert!(holder_commitment || self.funding_spend_confirmed.is_some()); } else if counterparty_revoked_commitment { let htlc_output_claim_pending = self.onchain_events_awaiting_threshold_conf.iter().any(|event| { @@ -2889,7 +2891,7 @@ impl ChannelMonitor { }); if let Some((txid, conf_thresh)) = funding_spend_pending { debug_assert!(us.funding_spend_confirmed.is_none(), - "We have a pending funding spend awaiting anti-reorg confirmation, we can't have confirmed it already!"); + "We have a pending funding spend awaiting its event threshold, it cannot have reached it already!"); confirmed_txid = Some(txid); pending_commitment_tx_conf_thresh = Some(conf_thresh); } @@ -3347,7 +3349,7 @@ macro_rules! fail_unbroadcast_htlcs { commitment_tx_output_idx: None, }, }; - log_trace!($logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of {} commitment transaction {}, waiting for confirmation (at height {})", + log_trace!($logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of {} commitment transaction {}, event reaches threshold at height {}", &htlc.payment_hash, $commitment_tx, $commitment_tx_type, $commitment_txid_confirmed, entry.confirmation_threshold()); $self.onchain_events_awaiting_threshold_conf.push(entry); @@ -3811,7 +3813,7 @@ impl ChannelMonitorImpl { // First check if a counterparty commitment transaction has been broadcasted: macro_rules! claim_htlcs { ($commitment_number: expr, $txid: expr, $htlcs: expr) => { - let htlc_claim_reqs = self.get_counterparty_output_claims_for_preimage(*payment_preimage, funding_spent, $commitment_number, $txid, $htlcs, confirmed_spend_height); + let htlc_claim_reqs = self.get_counterparty_output_claims_for_preimage(*payment_preimage, funding_spent, $commitment_number, $txid, $htlcs, confirmed_spend_height, logger); let conf_target = self.closure_conf_target(); self.onchain_tx_handler.update_claims_view_from_requests( htlc_claim_reqs, self.best_block.height, self.best_block.height, broadcaster, @@ -3860,6 +3862,9 @@ impl ChannelMonitorImpl { None }; if let Some(holder_commitment_tx) = holder_commitment_tx { + self.log_holder_preimage_claim_after_htlc_resolved_on_chain( + logger, holder_commitment_tx, *payment_preimage, + ); // Assume that the broadcasted commitment transaction confirmed in the current best // block. Even if not, its a reasonable metric for the bump criteria on the HTLC // transactions. @@ -3879,7 +3884,7 @@ impl ChannelMonitorImpl { fn generate_claimable_outpoints_and_watch_outputs( &mut self, generate_monitor_event_with_reason: Option, require_funding_seen: bool, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let funding = get_confirmed_funding_scope!(self); let holder_commitment_tx = &funding.current_holder_commitment_tx; let funding_outp = HolderFundingOutput::build( @@ -3887,7 +3892,7 @@ impl ChannelMonitorImpl { funding.channel_parameters.clone(), ); let funding_outpoint = funding.funding_outpoint(); - let commitment_package = PackageTemplate::build_package( + let commitment_package = ClaimRequest::new( funding_outpoint.txid.clone(), funding_outpoint.index as u32, PackageSolvingData::HolderFundingOutput(funding_outp), self.best_block.height, @@ -3926,9 +3931,9 @@ impl ChannelMonitorImpl { let zero_fee_commitments = self.channel_type_features().supports_anchor_zero_fee_commitments(); if !zero_fee_htlcs && !zero_fee_commitments { - // Because we're broadcasting a commitment transaction, we should construct the package - // assuming it gets confirmed in the next block. Sadly, we have code which considers - // "not yet confirmed" things as discardable, so we cannot do that here. + // Because we're broadcasting a commitment transaction, we should construct claim + // requests assuming it gets confirmed in the next block. Sadly, we have code which + // considers "not yet confirmed" things as discardable, so we cannot do that here. let (mut new_outpoints, _) = self.get_broadcasted_holder_claims( funding, holder_commitment_tx, self.best_block.height, ); @@ -4513,7 +4518,7 @@ impl ChannelMonitorImpl { // event for the same source. self.failed_back_htlc_ids.insert(SentHTLCId::from_source(source)); if let Some(confirmed_txid) = self.funding_spend_confirmed { - // Funding spend already confirmed past ANTI_REORG_DELAY: resolve immediately. + // Funding spend already reached its event threshold: resolve immediately. log_trace!( logger, "Failing HTLC from late counterparty commitment update immediately \ @@ -4549,7 +4554,7 @@ impl ChannelMonitorImpl { log_trace!( logger, "Failing HTLC from late counterparty commitment update, \ - waiting for confirmation (at height {})", + event reaches threshold at height {}", entry.confirmation_threshold() ); self.onchain_events_awaiting_threshold_conf.push(entry); @@ -4806,11 +4811,11 @@ impl ChannelMonitorImpl { /// height > height + CLTV_SHARED_CLAIM_BUFFER. In any case, will install monitoring for /// HTLC-Success/HTLC-Timeout transactions. /// - /// Returns packages to claim the revoked output(s) and general information about the output that - /// is to the counterparty in the commitment transaction. + /// Returns claim requests for the revoked output(s) and general information about the output + /// that is to the counterparty in the commitment transaction. #[rustfmt::skip] fn check_spend_counterparty_transaction(&mut self, commitment_txid: Txid, commitment_tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &L) - -> (Vec, CommitmentTxCounterpartyOutputInfo) + -> (Vec, CommitmentTxCounterpartyOutputInfo) { // Most secp and related errors trying to create keys means we have no hope of constructing // a spend transaction...so we return no transactions to broadcast @@ -4850,7 +4855,7 @@ impl ChannelMonitorImpl { per_commitment_point, per_commitment_key, outp.value, funding_spent.channel_parameters.clone(), height, ); - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( commitment_txid, idx as u32, PackageSolvingData::RevokedOutput(revk_outp), height + self.counterparty_commitment_params.on_counterparty_tx_csv as u32, @@ -4879,7 +4884,7 @@ impl ChannelMonitorImpl { } else { height }; - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( commitment_txid, transaction_output_index, PackageSolvingData::RevokedHTLCOutput(revk_htlc_outp), @@ -4963,12 +4968,12 @@ impl ChannelMonitorImpl { } } - fn get_counterparty_output_claims_for_preimage( + fn get_counterparty_output_claims_for_preimage( &self, preimage: PaymentPreimage, funding_spent: &FundingScope, commitment_number: u64, commitment_txid: Txid, per_commitment_option: Option<&Vec<(HTLCOutputInCommitment, Option>)>>, - confirmation_height: Option, - ) -> Vec { + confirmation_height: Option, logger: &L, + ) -> Vec { let per_commitment_claimable_data = match per_commitment_option { Some(outputs) => outputs, None => return Vec::new(), @@ -4984,6 +4989,16 @@ impl ChannelMonitorImpl { .filter_map(|(htlc, _)| { if let Some(transaction_output_index) = htlc.transaction_output_index { if htlc.offered && htlc.payment_hash == matching_payment_hash { + if let Some(resolved_htlc) = self.htlc_output_resolution_on_chain(htlc) { + self.log_preimage_claim_after_htlc_resolved_on_chain( + logger, + commitment_txid, + htlc, + preimage, + resolved_htlc, + ); + return None; + } let htlc_data = PackageSolvingData::CounterpartyOfferedHTLCOutput( CounterpartyOfferedHTLCOutput::build( per_commitment_point, @@ -4993,7 +5008,7 @@ impl ChannelMonitorImpl { confirmation_height, ), ); - Some(PackageTemplate::build_package( + Some(ClaimRequest::new( commitment_txid, transaction_output_index, htlc_data, @@ -5009,13 +5024,67 @@ impl ChannelMonitorImpl { .collect() } - /// Returns the HTLC claim package templates and the counterparty output info + fn htlc_output_resolution_on_chain( + &self, htlc: &HTLCOutputInCommitment, + ) -> Option<&IrrevocablyResolvedHTLC> { + htlc.transaction_output_index.and_then(|transaction_output_index| { + // Only suppress claims once the commitment HTLC output spend has + // reached anti-reorg finality. Any output created by that spend may + // still be CSV-delayed, but the original HTLC outpoint should not be + // re-claimed. + self.htlcs_resolved_on_chain.iter().find(|resolved_htlc| { + resolved_htlc.commitment_tx_output_idx == Some(transaction_output_index) + }) + }) + } + + fn log_preimage_claim_after_htlc_resolved_on_chain( + &self, logger: &L, commitment_txid: Txid, htlc: &HTLCOutputInCommitment, + preimage: PaymentPreimage, resolved_htlc: &IrrevocablyResolvedHTLC, + ) { + if resolved_htlc.payment_preimage == Some(preimage) { + return; + } + if let Some(transaction_output_index) = htlc.transaction_output_index { + let logger = WithContext::from(logger, None, None, Some(htlc.payment_hash)); + if let Some(resolving_txid) = resolved_htlc.resolving_txid.as_ref() { + log_error!(logger, "WE HAVE LIKELY LOST FUNDS: HTLC output {}:{} was irrevocably resolved on-chain by transaction {} without the payment preimage we now know; not replaying the claim", + commitment_txid, transaction_output_index, resolving_txid); + } else { + log_error!(logger, "WE HAVE LIKELY LOST FUNDS: HTLC output {}:{} was irrevocably resolved on-chain by an unknown transaction without the payment preimage we now know; not replaying the claim", + commitment_txid, transaction_output_index); + } + } + } + + fn log_holder_preimage_claim_after_htlc_resolved_on_chain( + &self, logger: &L, holder_tx: &HolderCommitmentTransaction, preimage: PaymentPreimage, + ) { + let matching_payment_hash = PaymentHash::from(preimage); + let tx = holder_tx.trust(); + for htlc in holder_tx.nondust_htlcs() { + if htlc.offered || htlc.payment_hash != matching_payment_hash { + continue; + } + if let Some(resolved_htlc) = self.htlc_output_resolution_on_chain(htlc) { + self.log_preimage_claim_after_htlc_resolved_on_chain( + logger, + tx.txid(), + htlc, + preimage, + resolved_htlc, + ); + } + } + } + + /// Returns the HTLC claim requests and the counterparty output info. fn get_counterparty_output_claim_info( &self, funding_spent: &FundingScope, commitment_number: u64, commitment_txid: Txid, tx: &Transaction, per_commitment_claimable_data: &[(HTLCOutputInCommitment, Option>)], confirmation_height: Option, - ) -> (Vec, CommitmentTxCounterpartyOutputInfo) { + ) -> (Vec, CommitmentTxCounterpartyOutputInfo) { let mut claimable_outpoints = Vec::new(); let mut to_counterparty_output_info: CommitmentTxCounterpartyOutputInfo = None; @@ -5056,6 +5125,9 @@ impl ChannelMonitorImpl { // per_commitment_data is corrupt or our commitment signing key leaked! return (claimable_outpoints, to_counterparty_output_info); } + if self.htlc_output_resolution_on_chain(htlc).is_some() { + continue; + } let preimage = if htlc.offered { if let Some((p, _)) = self.payment_preimages.get(&htlc.payment_hash) { Some(*p) @@ -5086,7 +5158,7 @@ impl ChannelMonitorImpl { ), ) }; - let counterparty_package = PackageTemplate::build_package( + let counterparty_package = ClaimRequest::new( commitment_txid, transaction_output_index, counterparty_htlc_outp, @@ -5104,7 +5176,7 @@ impl ChannelMonitorImpl { #[rustfmt::skip] fn check_spend_counterparty_htlc( &mut self, tx: &Transaction, commitment_number: u64, commitment_txid: &Txid, height: u32, logger: &L - ) -> (Vec, Option) { + ) -> (Vec, Option) { let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return (Vec::new(), None); }; let per_commitment_key = match SecretKey::from_slice(&secret) { Ok(key) => key, @@ -5135,7 +5207,7 @@ impl ChannelMonitorImpl { per_commitment_point, per_commitment_key, tx.output[idx].value, self.funding.channel_parameters.clone(), height, ); - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( htlc_txid, idx as u32, PackageSolvingData::RevokedOutput(revk_outp), height + self.counterparty_commitment_params.on_counterparty_tx_csv as u32, ); @@ -5157,6 +5229,9 @@ impl ChannelMonitorImpl { let mut htlcs = Vec::with_capacity(holder_tx.nondust_htlcs().len()); debug_assert_eq!(holder_tx.nondust_htlcs().len(), holder_tx.counterparty_htlc_sigs.len()); for (htlc, counterparty_sig) in holder_tx.nondust_htlcs().iter().zip(holder_tx.counterparty_htlc_sigs.iter()) { + if self.htlc_output_resolution_on_chain(htlc).is_some() { + continue; + } assert!(htlc.transaction_output_index.is_some(), "Expected transaction output index for non-dust HTLC"); let preimage = if htlc.offered { @@ -5187,13 +5262,14 @@ impl ChannelMonitorImpl { htlcs } - // Returns (1) `PackageTemplate`s that can be given to the OnchainTxHandler, so that the handler can - // broadcast transactions claiming holder HTLC commitment outputs and (2) a holder revokable - // script so we can detect whether a holder transaction has been seen on-chain. + // Returns (1) `ClaimRequest`s that can be given to the OnchainTxHandler, so that the + // handler can broadcast transactions claiming holder HTLC commitment outputs and (2) a + // holder revokable script so we can detect whether a holder transaction has been seen + // on-chain. #[rustfmt::skip] fn get_broadcasted_holder_claims( &self, funding: &FundingScope, holder_tx: &HolderCommitmentTransaction, conf_height: u32, - ) -> (Vec, Option<(ScriptBuf, PublicKey, RevocationKey)>) { + ) -> (Vec, Option<(ScriptBuf, PublicKey, RevocationKey)>) { let tx = holder_tx.trust(); let keys = tx.keys(); let redeem_script = chan_utils::get_revokeable_redeemscript( @@ -5212,7 +5288,7 @@ impl ChannelMonitorImpl { }; let transaction_output_index = htlc_descriptor.htlc.transaction_output_index .expect("Expected transaction output index for non-dust HTLC"); - PackageTemplate::build_package( + ClaimRequest::new( tx.txid(), transaction_output_index, PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build(htlc_descriptor, conf_height)), counterparty_spendable_height, @@ -5248,7 +5324,7 @@ impl ChannelMonitorImpl { fn check_spend_holder_transaction( &mut self, commitment_txid: Txid, commitment_tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &L, - ) -> Option<(Vec, TransactionOutputs)> { + ) -> Option<(Vec, TransactionOutputs)> { let funding_spent = get_confirmed_funding_scope!(self); // HTLCs set may differ between last and previous holder commitment txn, in case of one them hitting chain, ensure we cancel all HTLCs backward @@ -5724,9 +5800,9 @@ impl ChannelMonitorImpl { break; } } - self.is_resolving_htlc_output(&tx, height, &block_hash, logger); - self.check_tx_and_push_spendable_outputs(&tx, height, &block_hash, logger); + + self.is_resolving_htlc_output(&tx, height, &block_hash, logger); } } @@ -5759,7 +5835,7 @@ impl ChannelMonitorImpl { conf_hash: BlockHash, txn_matched: Vec<&Transaction>, mut watch_outputs: Vec, - mut claimable_outpoints: Vec, + mut claimable_outpoints: Vec, broadcaster: &B, fee_estimator: &LowerBoundedFeeEstimator, logger: &WithContext, @@ -6204,6 +6280,7 @@ impl ChannelMonitorImpl { &mut self, tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &WithContext, ) { let funding_spent = get_confirmed_funding_scope!(self); + let txid = tx.compute_txid(); 'outer_loop: for input in &tx.input { let mut payment_data = None; @@ -6290,18 +6367,25 @@ impl ChannelMonitorImpl { if payment_data.is_none() { log_claim!($tx_info, $holder_tx, htlc_output, false); let outbound_htlc = $holder_tx == htlc_output.offered; + let on_to_local_output_csv = if accepted_preimage_claim && !outbound_htlc { + Some(self.on_holder_tx_csv) } else { None }; + #[cfg(debug_assertions)] + if let Some(csv) = on_to_local_output_csv { + debug_assert!( + self.has_delayed_maturing_output_for_tx(txid, csv), + "CSV-delayed HTLC spend confirmation should have a matching MaturingOutput" + ); + } self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { - txid: tx.compute_txid(), height, block_hash: Some(*block_hash), transaction: Some(tx.clone()), + txid, height, block_hash: Some(*block_hash), transaction: Some(tx.clone()), event: OnchainEvent::HTLCSpendConfirmation { commitment_tx_output_idx: input.previous_output.vout, preimage: if accepted_preimage_claim || offered_preimage_claim { Some(payment_preimage) } else { None }, - // If this is a payment to us (ie !outbound_htlc), wait for - // the CSV delay before dropping the HTLC from claimable - // balance if the claim was an HTLC-Success transaction (ie - // accepted_preimage_claim). - on_to_local_output_csv: if accepted_preimage_claim && !outbound_htlc { - Some(self.on_holder_tx_csv) } else { None }, + // If this is a payment to us (ie !outbound_htlc), keep a + // record of the CSV delay. The delayed output is tracked + // separately as a MaturingOutput until it is spendable. + on_to_local_output_csv, }, }); continue 'outer_loop; @@ -6402,7 +6486,7 @@ impl ChannelMonitorImpl { commitment_tx_output_idx: Some(input.previous_output.vout), }, }; - log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height {})", &payment_hash, entry.confirmation_threshold()); + log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, event reaches threshold at height {}", &payment_hash, entry.confirmation_threshold()); self.onchain_events_awaiting_threshold_conf.push(entry); } } @@ -6454,6 +6538,19 @@ impl ChannelMonitorImpl { spendable_outputs } + #[cfg(debug_assertions)] + fn has_delayed_maturing_output_for_tx(&self, txid: Txid, csv: u16) -> bool { + self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { + entry.txid == txid + && match &entry.event { + OnchainEvent::MaturingOutput { + descriptor: SpendableOutputDescriptor::DelayedPaymentOutput(descriptor), + } => descriptor.to_self_delay == csv, + _ => false, + } + }) + } + /// Checks if the confirmed transaction is paying funds back to some address we can assume to /// own. #[rustfmt::skip] diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index d72d58b3149..5ff96e46953 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -563,10 +563,18 @@ pub struct ClaimId(pub [u8; 32]); impl ClaimId { pub(crate) fn from_htlcs(htlcs: &[HTLCDescriptor]) -> ClaimId { + let mut htlc_outpoints = htlcs + .iter() + .map(|htlc| { + (htlc.commitment_txid.to_byte_array(), htlc.htlc.transaction_output_index.unwrap()) + }) + .collect::>(); + htlc_outpoints.sort_unstable(); + let mut engine = Sha256::engine(); - for htlc in htlcs { - engine.input(&htlc.commitment_txid.to_byte_array()); - engine.input(&htlc.htlc.transaction_output_index.unwrap().to_be_bytes()); + for (commitment_txid, transaction_output_index) in htlc_outpoints { + engine.input(&commitment_txid); + engine.input(&transaction_output_index.to_be_bytes()); } ClaimId(Sha256::from_engine(engine).to_byte_array()) } @@ -581,8 +589,45 @@ impl ClaimId { #[cfg(test)] mod tests { use super::*; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, HTLCOutputInCommitment, HolderCommitmentTransaction, + }; + use crate::sign::ChannelDerivationParameters; + use crate::types::payment::{PaymentHash, PaymentPreimage}; use bitcoin::hashes::Hash; + fn dummy_htlc_descriptor( + commitment_txid: Txid, transaction_output_index: u32, + ) -> HTLCDescriptor { + let channel_parameters = ChannelTransactionParameters::test_dummy(100_000); + let htlc = HTLCOutputInCommitment { + offered: true, + amount_msat: 1000, + cltv_expiry: 100, + payment_hash: PaymentHash::from(PaymentPreimage([1; 32])), + transaction_output_index: Some(transaction_output_index), + }; + let funding_outpoint = channel_parameters.funding_outpoint.unwrap(); + let commitment_tx = + HolderCommitmentTransaction::dummy(100_000, funding_outpoint, vec![htlc.clone()]); + let trusted_tx = commitment_tx.trust(); + + HTLCDescriptor { + channel_derivation_parameters: ChannelDerivationParameters { + value_satoshis: channel_parameters.channel_value_satoshis, + keys_id: [1; 32], + transaction_parameters: channel_parameters, + }, + commitment_txid, + per_commitment_number: trusted_tx.commitment_number(), + per_commitment_point: trusted_tx.per_commitment_point(), + feerate_per_kw: trusted_tx.negotiated_feerate_per_kw(), + htlc, + preimage: None, + counterparty_sig: commitment_tx.counterparty_htlc_sigs[0], + } + } + #[test] fn test_best_block() { let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); @@ -618,4 +663,17 @@ mod tests { let chain_c = BlockLocator::new(hash_other, 200); assert_eq!(chain_a.find_common_ancestor(&chain_c), None); } + + #[test] + fn test_htlc_claim_id_is_descriptor_order_independent() { + // Use opposite txid and vout ordering so the assertion would fail if + // ClaimId still hashed descriptors in caller-provided order. + let first = dummy_htlc_descriptor(Txid::from_slice(&[1; 32]).unwrap(), 2); + let second = dummy_htlc_descriptor(Txid::from_slice(&[2; 32]).unwrap(), 1); + + assert_eq!( + ClaimId::from_htlcs(&[first.clone(), second.clone()]), + ClaimId::from_htlcs(&[second, first]) + ); + } } diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index 3eb6d64f3a2..e4d23d77e88 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -27,7 +27,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; use crate::chain::channelmonitor::ANTI_REORG_DELAY; -use crate::chain::package::{PackageSolvingData, PackageTemplate}; +use crate::chain::package::{ClaimRequest, PackageSolvingData, PackageTemplate}; use crate::chain::transaction::MaybeSignedTransaction; use crate::chain::ClaimId; use crate::ln::chan_utils::{ @@ -91,6 +91,12 @@ enum OnchainEvent { ContentiousOutpoint { package: PackageTemplate }, } +enum OutpointClaimState { + WaitingThresholdConf, + ClaimingRequestRegistered, + WaitingTimelock(u32), +} + impl Writeable for OnchainEventEntry { fn write(&self, writer: &mut W) -> Result<(), io::Error> { write_tlv_fields!(writer, { @@ -576,6 +582,32 @@ impl OnchainTxHandler { self.pending_claim_requests.len() != 0 } + fn outpoint_claim_state( + &self, outpoint: &BitcoinOutPoint, cur_height: u32, + ) -> Option { + if self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { + if let OnchainEvent::ContentiousOutpoint { ref package } = entry.event { + package.contains_outpoint(outpoint) + } else { + false + } + }) { + return Some(OutpointClaimState::WaitingThresholdConf); + } + + if self.claimable_outpoints.get(outpoint).is_some() { + return Some(OutpointClaimState::ClaimingRequestRegistered); + } + + self.locktimed_packages + .values() + .flat_map(|packages| packages.iter()) + .find(|locked_package| locked_package.contains_outpoint(outpoint)) + .map(|package| { + OutpointClaimState::WaitingTimelock(package.package_locktime(cur_height)) + }) + } + /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty /// onchain) lays on the assumption of claim transactions getting confirmed before timelock /// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck @@ -791,7 +823,7 @@ impl OnchainTxHandler { /// `cur_height`, however it must never be higher than `cur_height`. #[rustfmt::skip] pub(super) fn update_claims_view_from_requests( - &mut self, mut requests: Vec, conf_height: u32, cur_height: u32, + &mut self, mut requests: Vec, conf_height: u32, cur_height: u32, broadcaster: &B, conf_target: ConfirmationTarget, destination_script: &Script, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, ) { @@ -801,33 +833,36 @@ impl OnchainTxHandler { // First drop any duplicate claims. requests.retain(|req| { - debug_assert_eq!( - req.outpoints().len(), - 1, - "Claims passed to `update_claims_view_from_requests` should not be aggregated" - ); - let mut all_outpoints_claiming = true; - for outpoint in req.outpoints() { - if self.claimable_outpoints.get(outpoint).is_none() { - all_outpoints_claiming = false; + let outpoint = req.outpoint(); + if let Some(claim_state) = self.outpoint_claim_state(outpoint, cur_height) { + match claim_state { + OutpointClaimState::WaitingThresholdConf => { + // This is a package-layer guard. ChannelMonitor filters regenerated + // HTLC claims using HTLC resolution state, while this keeps outpoints + // split from an existing package from being re-added during the reorg + // window. + log_info!(logger, "Ignoring claim for outpoint {}:{}, it is already spent by a transaction awaiting anti-reorg finality", + outpoint.txid, outpoint.vout); + false + }, + OutpointClaimState::ClaimingRequestRegistered => { + log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", + outpoint.txid, outpoint.vout); + false + }, + OutpointClaimState::WaitingTimelock(locktime) => { + log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", + outpoint.txid, outpoint.vout, locktime); + false + }, } - } - if all_outpoints_claiming { - log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", - req.outpoints()[0].txid, req.outpoints()[0].vout); - false } else { - let timelocked_equivalent_package = self.locktimed_packages.iter().map(|v| v.1.iter()).flatten() - .find(|locked_package| locked_package.outpoints() == req.outpoints()); - if let Some(package) = timelocked_equivalent_package { - log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", - req.outpoints()[0].txid, req.outpoints()[0].vout, package.package_locktime(cur_height)); - false - } else { - true - } + true } }); + let mut requests = requests.into_iter() + .map(ClaimRequest::into_package_template) + .collect::>(); // Then try to maximally aggregate `requests`. for i in (1..requests.len()).rev() { @@ -1282,15 +1317,18 @@ impl OnchainTxHandler { #[cfg(test)] mod tests { - use bitcoin::hash_types::Txid; + use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; + use bitcoin::locktime::absolute::LockTime; + use bitcoin::transaction::{OutPoint as BitcoinOutPoint, Version}; use bitcoin::Network; - use bitcoin::{key::Secp256k1, secp256k1::PublicKey, secp256k1::SecretKey, ScriptBuf}; + use bitcoin::{key::Secp256k1, secp256k1::PublicKey, secp256k1::SecretKey}; + use bitcoin::{Amount, ScriptBuf, Transaction, TxIn, TxOut}; use types::features::ChannelTypeFeatures; use crate::chain::chaininterface::{ConfirmationTarget, LowerBoundedFeeEstimator}; - use crate::chain::package::{HolderHTLCOutput, PackageSolvingData, PackageTemplate}; + use crate::chain::package::{ClaimRequest, HolderHTLCOutput, PackageSolvingData}; use crate::chain::transaction::OutPoint; use crate::ln::chan_utils::{ ChannelPublicKeys, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, @@ -1305,12 +1343,9 @@ mod tests { use super::OnchainTxHandler; - // Test that all claims with locktime equal to or less than the current height are broadcast - // immediately while claims with locktime greater than the current height are only broadcast - // once the locktime is reached. - #[test] - #[rustfmt::skip] - fn test_broadcast_height() { + fn new_test_tx_handler( + channel_type_features: ChannelTypeFeatures, nondust_htlcs: Vec, + ) -> OnchainTxHandler { let secp_ctx = Secp256k1::new(); let signer = InMemorySigner::new( SecretKey::from_slice(&[41; 32]).unwrap(), @@ -1347,9 +1382,6 @@ mod tests { )), }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; - - // Use non-anchor channels so that HTLC-Timeouts are broadcast immediately instead of sent - // to the user for external funding. let chan_params = ChannelTransactionParameters { holder_pubkeys: signer.pubkeys(&secp_ctx), holder_selected_contest_delay: 66, @@ -1360,66 +1392,45 @@ mod tests { }), funding_outpoint: Some(funding_outpoint), splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_type_features, channel_value_satoshis: 0, }; - - // Create an OnchainTxHandler for a commitment containing HTLCs with CLTV expiries of 0, 1, - // and 2 blocks. - let mut nondust_htlcs = Vec::new(); - for i in 0..3 { - let preimage = PaymentPreimage([i; 32]); - let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); - nondust_htlcs.push( - HTLCOutputInCommitment { - offered: true, - amount_msat: 10000, - cltv_expiry: i as u32, - payment_hash: hash, - transaction_output_index: Some(i as u32), - } - ); - } - let holder_commit = HolderCommitmentTransaction::dummy(1000000, funding_outpoint, nondust_htlcs); - let destination_script = ScriptBuf::new(); + let holder_commit = + HolderCommitmentTransaction::dummy(1000000, funding_outpoint, nondust_htlcs); let counterparty_node_id = PublicKey::from_slice(&[2; 33]).unwrap(); - let mut tx_handler = OnchainTxHandler::new( + OnchainTxHandler::new( ChannelId::from_bytes([0; 32]), counterparty_node_id, 1000000, [0; 32], - destination_script.clone(), + ScriptBuf::new(), signer, chan_params, holder_commit, secp_ctx, - ); - - // Create a broadcaster with current block height 1. - let broadcaster = TestBroadcaster::new(Network::Testnet); - { - let mut blocks = broadcaster.blocks.lock().unwrap(); - let genesis_hash = blocks[0].0.block_hash(); - blocks.push((create_dummy_block(genesis_hash, 0, Vec::new()), 1)); - } - - let fee_estimator = TestFeeEstimator::new(253); - let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); - let logger = TestLogger::new(); + ) + } - // Request claiming of each HTLC on the holder's commitment, with current block height 1. + fn build_offered_holder_htlc_requests( + tx_handler: &OnchainTxHandler, + ) -> Vec { let holder_commit = tx_handler.current_holder_commitment_tx(); let holder_commit_txid = holder_commit.trust().txid(); let mut requests = Vec::new(); - for (htlc, counterparty_sig) in holder_commit.nondust_htlcs().iter().zip(holder_commit.counterparty_htlc_sigs.iter()) { - requests.push(PackageTemplate::build_package( + for (htlc, counterparty_sig) in + holder_commit.nondust_htlcs().iter().zip(holder_commit.counterparty_htlc_sigs.iter()) + { + requests.push(ClaimRequest::new( holder_commit_txid, htlc.transaction_output_index.unwrap(), - PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build(HTLCDescriptor { + PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build( + HTLCDescriptor { channel_derivation_parameters: ChannelDerivationParameters { value_satoshis: tx_handler.channel_value_satoshis, keys_id: tx_handler.channel_keys_id, - transaction_parameters: tx_handler.channel_transaction_parameters.clone(), + transaction_parameters: tx_handler + .channel_transaction_parameters + .clone(), }, commitment_txid: holder_commit_txid, per_commitment_number: holder_commit.commitment_number(), @@ -1429,11 +1440,63 @@ mod tests { preimage: None, counterparty_sig: *counterparty_sig, }, - 0 + 0, )), 0, )); } + requests + } + + fn locked_outpoints( + tx_handler: &OnchainTxHandler, locktime: u32, + ) -> Vec { + tx_handler + .locktimed_packages + .get(&locktime) + .into_iter() + .flat_map(|packages| packages.iter()) + .flat_map(|package| package.outpoints().into_iter().map(|outpoint| *outpoint)) + .collect() + } + + // Test that all claims with locktime equal to or less than the current height are broadcast + // immediately while claims with locktime greater than the current height are only broadcast + // once the locktime is reached. + #[test] + fn test_broadcast_height() { + // Create an OnchainTxHandler for a commitment containing HTLCs with CLTV expiries of 0, 1, + // and 2 blocks. + let mut nondust_htlcs = Vec::new(); + for i in 0..3 { + let preimage = PaymentPreimage([i; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: i as u32, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + let destination_script = ScriptBuf::new(); + let mut tx_handler = + new_test_tx_handler(ChannelTypeFeatures::only_static_remote_key(), nondust_htlcs); + + // Create a broadcaster with current block height 1. + let broadcaster = TestBroadcaster::new(Network::Testnet); + { + let mut blocks = broadcaster.blocks.lock().unwrap(); + let genesis_hash = blocks[0].0.block_hash(); + blocks.push((create_dummy_block(genesis_hash, 0, Vec::new()), 1)); + } + + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Request claiming of each HTLC on the holder's commitment, with current block height 1. + let requests = build_offered_holder_htlc_requests(&tx_handler); tx_handler.update_claims_view_from_requests( requests, 1, @@ -1474,4 +1537,193 @@ mod tests { assert_eq!(txs_broadcasted.len(), 1); assert_eq!(txs_broadcasted[0].lock_time.to_consensus_u32(), 2); } + + #[test] + fn test_duplicate_pending_claim_request_after_force_close_replay() { + let claim_height = 21; + let locktime = 42; + let mut nondust_htlcs = Vec::new(); + for i in 0..2 { + let preimage = PaymentPreimage([i + 1; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: locktime, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + + let mut tx_handler = new_test_tx_handler( + ChannelTypeFeatures::anchors_zero_htlc_fee_and_dependencies(), + nondust_htlcs, + ); + let requests = build_offered_holder_htlc_requests(&tx_handler); + let destination_script = ScriptBuf::new(); + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Simulate the force-close path registering the two holder HTLC claims as + // a single delayed package. + tx_handler.update_claims_view_from_requests( + requests.clone(), + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!( + tx_handler.locktimed_packages.get(&locktime).map(|packages| packages.len()), + Some(1), + ); + + // Replaying the same per-HTLC claim requests must match by outpoint + // coverage, otherwise each single-outpoint request would be added again. + tx_handler.update_claims_view_from_requests( + requests, + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!( + tx_handler.locktimed_packages.get(&locktime).map(|packages| packages.len()), + Some(1), + ); + + // At locktime, the delayed package should still yield one bump event + // covering both HTLCs. + tx_handler.update_claims_view_from_requests( + Vec::new(), + locktime, + locktime, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + + let pending_events = tx_handler.get_and_clear_pending_claim_events(); + assert_eq!(pending_events.len(), 1); + assert_eq!(tx_handler.pending_claim_requests.len(), 1); + assert_eq!(tx_handler.claimable_outpoints.len(), 2); + match &pending_events[0].1 { + super::ClaimEvent::BumpHTLC { htlcs, tx_lock_time, .. } => { + assert_eq!(htlcs.len(), 2); + assert_eq!(tx_lock_time.to_consensus_u32(), locktime); + }, + _ => panic!("expected a single HTLC bump event"), + } + } + + #[test] + fn test_replayed_claim_ignored_for_pending_spent_outpoint() { + let claim_height = 21; + let spend_height = 22; + let locktime = 42; + let mut nondust_htlcs = Vec::new(); + for i in 0..2 { + let preimage = PaymentPreimage([i + 1; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: locktime, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + + let mut tx_handler = new_test_tx_handler( + ChannelTypeFeatures::anchors_zero_htlc_fee_and_dependencies(), + nondust_htlcs, + ); + let requests = build_offered_holder_htlc_requests(&tx_handler); + let spent_outpoint = *requests[0].outpoint(); + let still_delayed_outpoint = *requests[1].outpoint(); + let destination_script = ScriptBuf::new(); + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Register both holder HTLC claims as one delayed package before any + // individual outpoint spends are observed. + tx_handler.update_claims_view_from_requests( + requests.clone(), + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!(locked_outpoints(&tx_handler, locktime).len(), 2); + + // Spend one outpoint before the package reaches its timelock. The handler + // should split it into a ContentiousOutpoint until the spend reaches + // anti-reorg finality. + let spend_tx = Transaction { + version: Version::TWO, + lock_time: LockTime::ZERO, + input: vec![TxIn { previous_output: spent_outpoint, ..Default::default() }], + output: vec![TxOut { value: Amount::from_sat(1000), script_pubkey: ScriptBuf::new() }], + }; + tx_handler.update_claims_view_from_matched_txn( + &[&spend_tx], + spend_height, + BlockHash::all_zeros(), + spend_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked, vec![still_delayed_outpoint]); + + // Replaying both original claim requests during that window must not + // re-add the already-spent outpoint to the delayed package. + tx_handler.update_claims_view_from_requests( + requests, + spend_height, + spend_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked, vec![still_delayed_outpoint]); + assert!(tx_handler.pending_claim_requests.is_empty()); + assert!(tx_handler.claimable_outpoints.is_empty()); + + // If the spend reorgs out, the contentious outpoint is resurrected into + // the delayed package. + tx_handler.blocks_disconnected( + spend_height - 1, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked.len(), 2); + assert!(locked.contains(&spent_outpoint)); + assert!(locked.contains(&still_delayed_outpoint)); + } } diff --git a/lightning/src/chain/package.rs b/lightning/src/chain/package.rs index 0ef8855242b..06be5750367 100644 --- a/lightning/src/chain/package.rs +++ b/lightning/src/chain/package.rs @@ -1097,6 +1097,19 @@ enum PackageMalleability { Untractable, } +/// A single on-chain output claim generated by [`ChannelMonitor`]. +/// +/// These requests are converted to [`PackageTemplate`]s once [`OnchainTxHandler`] has deduplicated +/// them and is ready to aggregate compatible claims. +/// +/// [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor +/// [`OnchainTxHandler`]: crate::chain::onchaintx::OnchainTxHandler +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ClaimRequest { + input: (BitcoinOutPoint, PackageSolvingData), + counterparty_spendable_height: u32, +} + /// A structure to describe a package content that is generated by ChannelMonitor and /// used by OnchainTxHandler to generate and broadcast transactions settling onchain claims. /// @@ -1179,6 +1192,32 @@ impl PartialEq for PackageTemplate { } } +impl ClaimRequest { + pub(crate) fn new( + txid: Txid, vout: u32, input_solving_data: PackageSolvingData, + counterparty_spendable_height: u32, + ) -> Self { + Self { + input: (BitcoinOutPoint { txid, vout }, input_solving_data), + counterparty_spendable_height, + } + } + + pub(crate) fn outpoint(&self) -> &BitcoinOutPoint { + &self.input.0 + } + + pub(crate) fn into_package_template(self) -> PackageTemplate { + let (outpoint, input_solving_data) = self.input; + PackageTemplate::build_package( + outpoint.txid, + outpoint.vout, + input_solving_data, + self.counterparty_spendable_height, + ) + } +} + impl PackageTemplate { #[rustfmt::skip] pub(crate) fn can_merge_with(&self, other: &PackageTemplate, cur_height: u32) -> bool { @@ -1265,6 +1304,9 @@ impl PackageTemplate { pub(crate) fn outpoints(&self) -> Vec<&BitcoinOutPoint> { self.inputs.iter().map(|(o, _)| o).collect() } + pub(crate) fn contains_outpoint(&self, outpoint: &BitcoinOutPoint) -> bool { + self.inputs.iter().any(|(input, _)| input == outpoint) + } pub(crate) fn outpoints_and_creation_heights( &self, ) -> impl Iterator)> { diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index f52f093917b..111d1fbfd81 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -115,6 +115,73 @@ fn test_spendable_output<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, spendable_t } else { panic!(); } } +#[test] +fn preimage_claim_balance_unchanged_between_anti_reorg_and_csv() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let legacy_cfg = test_legacy_channel_config(); + let node_chanmgrs = + create_node_chanmgrs(2, &node_cfgs, &[Some(legacy_cfg.clone()), Some(legacy_cfg)]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + // Route an inbound HTLC to node 0 so its preimage claim spends an HTLC output from node 0's + // holder commitment and creates a CSV-delayed output. + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[1], nodes[0], 12_000_000); + nodes[1].node.send_payment_with_route(route, payment_hash, + RecipientOnionFields::secret_only(payment_secret, 12_000_000), PaymentId(payment_hash.0)).unwrap(); + check_added_monitors(&nodes[1], 1); + let updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + nodes[0].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); + expect_and_process_pending_htlcs(&nodes[0], false); + expect_payment_claimable!(nodes[0], payment_hash, payment_secret, 12_000_000); + + // Confirm node 0's holder commitment before claiming the HTLC so the preimage claim has a + // delayed output that remains tracked as an HTLC balance until it becomes spendable. + let message = "Channel force-closed".to_owned(); + nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &nodes[1].node.get_our_node_id(), message.clone()).unwrap(); + check_added_monitors(&nodes[0], 1); + check_closed_broadcast(&nodes[0], 1, true); + let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message }; + check_closed_event(&nodes[0], 1, reason, &[nodes[1].node.get_our_node_id()], 1000000); + let commitment_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(commitment_txn.len(), 1); + check_spends!(commitment_txn[0], funding_tx); + mine_transaction(&nodes[0], &commitment_txn[0]); + nodes[0].tx_broadcaster.clear(); + + // Claiming the HTLC with the preimage broadcasts the HTLC-Success transaction. Once it + // confirms, the resulting delayed output should be reported as an HTLC balance awaiting + // confirmations. + nodes[0].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[0], 1); + expect_payment_claimed!(nodes[0], payment_hash, 12_000_000); + let htlc_claim_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(htlc_claim_txn.len(), 1); + check_spends!(htlc_claim_txn[0], commitment_txn[0]); + mine_transaction(&nodes[0], &htlc_claim_txn[0]); + + let htlc_claim_balances = sorted_vec(nodes[0].chain_monitor.chain_monitor + .get_monitor(chan_id).unwrap().get_claimable_balances()); + assert!(htlc_claim_balances.iter().any(|balance| matches!(balance, + Balance::ClaimableAwaitingConfirmations { + amount_satoshis: 12_000, + source: BalanceSource::Htlc, + .. + } + ))); + + // Advance only to anti-reorg finality for the HTLC-Success transaction. The CSV-delayed + // output is not spendable yet, so the claimable HTLC balance should remain unchanged. + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert_eq!(htlc_claim_balances, sorted_vec(nodes[0].chain_monitor.chain_monitor + .get_monitor(chan_id).unwrap().get_claimable_balances())); +} + #[test] fn revoked_output_htlc_resolution_timing() { // Tests that HTLCs which were present in a broadcasted remote revoked commitment transaction @@ -2388,6 +2455,199 @@ fn test_restored_packages_retry() { do_test_restored_packages_retry(true); } +fn do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(p2a_anchor: bool) { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_deserialized; + let mut anchors_config = test_default_channel_config(); + anchors_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + anchors_config.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + let node_chanmgrs = + create_node_chanmgrs(2, &node_cfgs, &[Some(anchors_config.clone()), Some(anchors_config)]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let coinbase_tx = provide_anchor_reserves(&nodes); + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 50_000_000); + + // Seed two unresolved outbound HTLCs that will be aggregated into one + // delayed holder-commitment package after force close. + route_payment(&nodes[0], &[&nodes[1]], 10_000_000); + route_payment(&nodes[0], &[&nodes[1]], 11_000_000); + + // Add a third incoming HTLC which will later be claimed by preimage after + // the commitment transaction confirms, reproducing the replay path. + let (claim_route, claim_hash, claim_preimage, claim_secret) = + get_route_and_payment_hash!(nodes[1], nodes[0], 12_000_000); + nodes[1] + .node + .send_payment_with_route( + claim_route, + claim_hash, + RecipientOnionFields::secret_only(claim_secret, 12_000_000), + PaymentId(claim_hash.0), + ) + .unwrap(); + check_added_monitors(&nodes[1], 1); + let updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + nodes[0] + .node + .handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); + expect_and_process_pending_htlcs(&nodes[0], false); + expect_payment_claimable!(nodes[0], claim_hash, claim_secret, 12_000_000); + + // Force-close node 0 so its holder commitment hits chain and its HTLC + // claims are fed into OnchainTxHandler as delayed requests. + let message = "Channel force-closed".to_owned(); + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &chan_id, + &nodes[1].node.get_our_node_id(), + message.clone(), + ) + .unwrap(); + check_added_monitors(&nodes[0], 1); + check_closed_broadcast(&nodes[0], 1, true); + let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message }; + check_closed_event(&nodes[0], 1, reason, &[nodes[1].node.get_our_node_id()], 1_000_000); + handle_bump_close_event(&nodes[0]); + + let (commitment_tx, anchor_tx) = { + let mut txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(txn.len(), if p2a_anchor { 2 } else { 1 }); + let anchor_tx = p2a_anchor.then(|| txn.pop().unwrap()); + let commitment_tx = txn.pop().unwrap(); + check_spends!(commitment_tx, funding_tx); + if p2a_anchor { + check_spends!(anchor_tx.as_ref().unwrap(), commitment_tx, coinbase_tx); + } + (commitment_tx, anchor_tx) + }; + + let _ = mine_transaction(&nodes[0], &commitment_tx); + if p2a_anchor { + let _ = mine_transaction(&nodes[0], anchor_tx.as_ref().unwrap()); + } + + // Claim the incoming HTLC after the commitment is confirmed. This + // regenerates a single-outpoint claim request alongside the existing + // delayed package covering the two earlier HTLCs. + nodes[0].node.claim_funds(claim_preimage); + check_added_monitors(&nodes[0], 1); + expect_payment_claimed!(nodes[0], claim_hash, 12_000_000); + + // Once all holder HTLCs reach their timelock, we should see the original two-HTLC + // delayed package plus the replayed single-HTLC claim, not duplicates of + // the delayed package's outpoints. + connect_blocks(&nodes[0], TEST_FINAL_CLTV + 1); + + let events = nodes[0] + .chain_monitor + .chain_monitor + .get_and_clear_pending_events() + .into_iter() + .collect::>(); + let mut htlc_event_sizes = events + .iter() + .filter_map(|event| { + if let Event::BumpTransaction(BumpTransactionEvent::HTLCResolution { + htlc_descriptors, .. + }) = event + { + Some(htlc_descriptors.len()) + } else { + None + } + }) + .collect::>(); + htlc_event_sizes.sort_unstable(); + assert_eq!(htlc_event_sizes, vec![1, 2]); + + // Drive only the replayed single-HTLC event on-chain so we can replay the + // preimage once the spend is anti-reorg final, then again after reload. + for event in events { + if let Event::BumpTransaction(event) = event { + let is_single_htlc = if let BumpTransactionEvent::HTLCResolution { + ref htlc_descriptors, + .. + } = event + { + htlc_descriptors.len() == 1 + } else { + false + }; + if is_single_htlc { + nodes[0].bump_tx_handler.handle_event(&event); + break; + } + } + } + let mut htlc_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(htlc_txn.len(), 1); + let htlc_tx = htlc_txn.pop().unwrap(); + mine_transaction(&nodes[0], &htlc_tx); + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + // The spend has passed anti-reorg finality, but its CSV-delayed output is + // not yet spendable. Replaying the preimage in this window must not create + // a new conflicting claim for the already-spent commitment HTLC output. + get_monitor!(nodes[0], chan_id).provide_payment_preimage_unsafe_legacy( + &claim_hash, + &claim_preimage, + &node_cfgs[0].tx_broadcaster, + &LowerBoundedFeeEstimator::new(node_cfgs[0].fee_estimator), + &nodes[0].logger, + ); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + let balances = nodes[0] + .chain_monitor + .chain_monitor + .get_monitor(chan_id) + .unwrap() + .get_claimable_balances(); + assert!(balances.iter().any(|balance| matches!( + balance, + Balance::ClaimableAwaitingConfirmations { + amount_satoshis: 12_000, + source: BalanceSource::Htlc, + .. + } + ))); + + connect_blocks(&nodes[0], BREAKDOWN_TIMEOUT as u32 - ANTI_REORG_DELAY); + let _ = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); + + // Reload before replaying the preimage so the regression covers persisted + // resolution state, not only in-memory filtering. + let serialized_channel_manager = nodes[0].node.encode(); + let serialized_monitor = get_monitor!(nodes[0], chan_id).encode(); + reload_node!( + nodes[0], &serialized_channel_manager, &[&serialized_monitor], persister, + new_chain_monitor, node_deserialized + ); + + // Replaying the preimage update must not regenerate a claim for the HTLC + // whose commitment output has anti-reorg persisted resolution state. + get_monitor!(nodes[0], chan_id).provide_payment_preimage_unsafe_legacy( + &claim_hash, &claim_preimage, &node_cfgs[0].tx_broadcaster, + &LowerBoundedFeeEstimator::new(node_cfgs[0].fee_estimator), &nodes[0].logger, + ); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + expect_payment_claimed!(nodes[0], claim_hash, 12_000_000); + check_added_monitors(&nodes[0], 1); +} + +#[test] +fn test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay() { + do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(false); + do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(true); +} + fn do_test_monitor_rebroadcast_pending_claims(keyed_anchors: bool, p2a_anchor: bool) { // Test that we will retry broadcasting pending claims for a force-closed channel on every // `ChainMonitor::rebroadcast_pending_claims` call.