From 79c8707ecebec430fb867808f6793fc79dd1f3b4 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 11 May 2026 14:02:44 +0200 Subject: [PATCH 01/19] fuzz: model chanmon persistence in harness Replace the chanmon consistency harness' Watch wrapper with a Persist implementation backed by HarnessPersister. Monitor writes now flow through the real ChainMonitor persistence hooks. Track restart candidates separately from monitor completion callbacks. A monitor can stop being a valid reload candidate once a newer baseline is durable, while its callback may still be needed to unblock the live ChainMonitor. On reload, choose the durable baseline, first pending snapshot, or last pending snapshot. Startup monitor registration completes immediately before the configured persistence style is restored. --- fuzz/src/chanmon_consistency.rs | 567 ++++++++++++++++++++------------ 1 file changed, 348 insertions(+), 219 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 4a182c33beb..aadc58ff4ce 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -41,8 +41,7 @@ use lightning::chain; use lightning::chain::chaininterface::{ BroadcasterInterface, ConfirmationTarget, FeeEstimator, TransactionType, }; -use lightning::chain::channelmonitor::{ChannelMonitor, MonitorEvent}; -use lightning::chain::transaction::OutPoint; +use lightning::chain::channelmonitor::ChannelMonitor; use lightning::chain::{ chainmonitor, channelmonitor, BlockLocator, ChannelMonitorUpdateStatus, Confirm, Watch, }; @@ -87,7 +86,6 @@ use lightning::util::wallet_utils::{WalletSourceSync, WalletSync}; use lightning_invoice::RawBolt11Invoice; use crate::utils::test_logger::{self, Output}; -use crate::utils::test_persister::TestPersister; use bitcoin::secp256k1::ecdh::SharedSecret; use bitcoin::secp256k1::ecdsa::{RecoverableSignature, Signature}; @@ -293,144 +291,302 @@ impl Writer for VecWriter { } } -/// The LDK API requires that any time we tell it we're done persisting a `ChannelMonitor[Update]` -/// we never pass it in as the "latest" `ChannelMonitor` on startup. However, we can pass -/// out-of-date monitors as long as we never told LDK we finished persisting them, which we do by -/// storing both old `ChannelMonitor`s and ones that are "being persisted" here. +fn serialize_monitor(monitor: &ChannelMonitor) -> Vec { + let mut ser = VecWriter(Vec::new()); + monitor.write(&mut ser).unwrap(); + ser.0 +} + +/// LDK requires the `ChannelMonitor` loaded on startup to be at least as current as the +/// `ChannelManager` state, except for monitor updates that `ChannelManager` still records as +/// in-flight and can replay. This harness tracks the monitor blobs that remain valid restart +/// candidates under that rule. /// -/// Note that such "being persisted" `ChannelMonitor`s are stored in `ChannelManager` and will -/// simply be replayed on startup. +/// Separately, we track every `InProgress` persistence operation that still needs a +/// `channel_monitor_updated` call. A newer persisted monitor can make an older monitor invalid for +/// restart while the older update still needs to be completed to unblock the live `ChainMonitor`. +/// +/// Off-chain monitor updates that are still "being persisted" are stored in `ChannelManager` and +/// will be replayed on startup. Full-monitor snapshots from chain sync or archive paths that return +/// `InProgress` are only restart candidates; losing one on restart does not require a +/// `channel_monitor_updated` callback. struct LatestMonitorState { /// The latest monitor id which we told LDK we've persisted. /// - /// Note that there may still be earlier pending monitor updates in [`Self::pending_monitors`] - /// which we haven't yet completed. We're allowed to reload with those as well, at least until - /// they're completed. + /// Note that earlier updates may still need a `channel_monitor_updated` callback via + /// [`Self::pending_monitor_completions`]. persisted_monitor_id: u64, /// The latest serialized `ChannelMonitor` that we told LDK we persisted. persisted_monitor: Vec, - /// A set of (monitor id, serialized `ChannelMonitor`)s which we're currently "persisting", - /// from LDK's perspective. + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which remain safe to use as + /// stale monitors on reload. pending_monitors: Vec<(u64, Vec)>, + /// An ordered list of (monitor id, serialized `ChannelMonitor`)s which still need a + /// `channel_monitor_updated` callback. + pending_monitor_completions: Vec<(u64, Vec)>, } +impl LatestMonitorState { + fn insert_pending_entry( + pending: &mut Vec<(u64, Vec)>, monitor_id: u64, serialized_monitor: Vec, + ) { + // Monitor update ids must arrive in order. Assert at insertion time so duplicates or + // out-of-order updates fail close to the write that caused them instead of being sorted + // into place. + assert!( + pending.last().map_or(true, |(last_id, _)| *last_id < monitor_id), + "pending monitor updates should arrive in order" + ); + pending.push((monitor_id, serialized_monitor)); + } -struct TestChainMonitor { - pub logger: Arc, - pub keys: Arc, - pub persister: Arc, - pub chain_monitor: Arc< - chainmonitor::ChainMonitor< - TestChannelSigner, - Arc, - Arc, - Arc, - Arc, - Arc, - Arc, - >, - >, - pub latest_monitors: Mutex>, -} -impl TestChainMonitor { - pub fn new( - broadcaster: Arc, logger: Arc, feeest: Arc, - persister: Arc, keys: Arc, - ) -> Self { - Self { - chain_monitor: Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster, - logger.clone(), - feeest, - Arc::clone(&persister), - Arc::clone(&keys), - keys.get_peer_storage_key(), - false, - )), - logger, - keys, - persister, - latest_monitors: Mutex::new(new_hash_map()), + fn insert_pending_monitor_candidate(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Full-monitor persists from chain sync or archive paths use the monitor's current + // latest_update_id rather than a fresh ChannelMonitorUpdate id. Keep duplicate ids so + // reload can choose between multiple same-id full snapshots that were in flight together. + if let Some((last_id, _)) = self.pending_monitors.last() { + assert!(*last_id <= monitor_id, "pending monitor updates should arrive in order"); } + self.pending_monitors.push((monitor_id, serialized_monitor)); } -} -impl chain::Watch for TestChainMonitor { - fn watch_channel( - &self, channel_id: ChannelId, monitor: channelmonitor::ChannelMonitor, - ) -> Result { - let mut ser = VecWriter(Vec::new()); - monitor.write(&mut ser).unwrap(); - let monitor_id = monitor.get_latest_update_id(); - let res = self.chain_monitor.watch_channel(channel_id, monitor); - let state = match res { - Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: ser.0, - pending_monitors: Vec::new(), - }, - Ok(chain::ChannelMonitorUpdateStatus::InProgress) => LatestMonitorState { - persisted_monitor_id: monitor_id, - persisted_monitor: Vec::new(), - pending_monitors: vec![(monitor_id, ser.0)], - }, - Ok(chain::ChannelMonitorUpdateStatus::UnrecoverableError) => panic!(), - Err(()) => panic!(), - }; - if self.latest_monitors.lock().unwrap().insert(channel_id, state).is_some() { - panic!("Already had monitor pre-watch_channel"); + + fn mark_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // Once a monitor is durable, use it as the restart baseline and stop tracking candidates + // at or behind that update id. Completion obligations are tracked separately and are + // deliberately not pruned here. + self.pending_monitors.retain(|(id, _)| *id > monitor_id); + if monitor_id >= self.persisted_monitor_id { + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; } - res } - fn update_channel( - &self, channel_id: ChannelId, update: &channelmonitor::ChannelMonitorUpdate, - ) -> chain::ChannelMonitorUpdateStatus { - let mut map_lock = self.latest_monitors.lock().unwrap(); - let map_entry = map_lock.get_mut(&channel_id).expect("Didn't have monitor on update call"); - let latest_monitor_data = map_entry - .pending_monitors - .last() - .as_ref() - .map(|(_, data)| data) - .unwrap_or(&map_entry.persisted_monitor); - let deserialized_monitor = - <(BlockLocator, channelmonitor::ChannelMonitor)>::read( - &mut &latest_monitor_data[..], - (&*self.keys, &*self.keys), - ) - .unwrap() - .1; - deserialized_monitor - .update_monitor( - update, - &&TestBroadcaster { txn_broadcasted: RefCell::new(Vec::new()) }, - &&FuzzEstimator { ret_val: atomic::AtomicU32::new(253) }, - &self.logger, - ) - .unwrap(); - let mut ser = VecWriter(Vec::new()); - deserialized_monitor.write(&mut ser).unwrap(); - let res = self.chain_monitor.update_channel(channel_id, update); - match res { - chain::ChannelMonitorUpdateStatus::Completed => { - map_entry.persisted_monitor_id = update.update_id; - map_entry.persisted_monitor = ser.0; + fn insert_pending( + &mut self, monitor_id: u64, serialized_monitor: Vec, needs_completion: bool, + ) { + if needs_completion { + // persist_new_channel and update_persisted_channel(Some(_)) require a later + // channel_monitor_updated callback if persistence returns InProgress. + Self::insert_pending_entry( + &mut self.pending_monitors, + monitor_id, + serialized_monitor.clone(), + ); + Self::insert_pending_entry( + &mut self.pending_monitor_completions, + monitor_id, + serialized_monitor, + ); + } else { + // This harness treats update_persisted_channel(None, ...) as the chain-sync/archive + // case: the full monitor may be used on restart, but ChainMonitor does not wait for a + // channel_monitor_updated callback. + self.insert_pending_monitor_candidate(monitor_id, serialized_monitor); + } + } + + fn mark_completed_update_persisted(&mut self, monitor_id: u64, serialized_monitor: Vec) { + // The selector/drain path should already have removed this entry before + // finish_monitor_update calls channel_monitor_updated. This check catches accidental + // double-completion or pruning of the wrong list. + assert!( + self.pending_monitor_completions.iter().all(|(id, _)| *id != monitor_id), + "completed monitor update should already be removed from the completion queue" + ); + self.mark_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_completions(&mut self) -> Vec<(u64, Vec)> { + std::mem::take(&mut self.pending_monitor_completions) + } + + fn take_pending_completion( + &mut self, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + // The fuzzer chooses which outstanding callback to deliver. These choices apply to + // completion obligations, not to the set of monitors that may be used on restart. + match selector { + MonitorUpdateSelector::First => { + if self.pending_monitor_completions.is_empty() { + None + } else { + Some(self.pending_monitor_completions.remove(0)) + } }, - chain::ChannelMonitorUpdateStatus::InProgress => { - map_entry.pending_monitors.push((update.update_id, ser.0)); + MonitorUpdateSelector::Second => { + if self.pending_monitor_completions.len() > 1 { + Some(self.pending_monitor_completions.remove(1)) + } else { + None + } }, - chain::ChannelMonitorUpdateStatus::UnrecoverableError => panic!(), + MonitorUpdateSelector::Last => self.pending_monitor_completions.pop(), } - res } - fn release_pending_monitor_events( - &self, - ) -> Vec<(OutPoint, ChannelId, Vec, PublicKey)> { - return self.chain_monitor.release_pending_monitor_events(); + fn select_monitor_for_reload(&mut self, selector: MonitorReloadSelector) { + // A restart can load the last monitor we told LDK was persisted, or a monitor snapshot + // whose write was started before the simulated crash. + let old_mon = (self.persisted_monitor_id, std::mem::take(&mut self.persisted_monitor)); + let (monitor_id, serialized_monitor) = match selector { + MonitorReloadSelector::Persisted => old_mon, + MonitorReloadSelector::FirstPending => { + if self.pending_monitors.is_empty() { + old_mon + } else { + self.pending_monitors.remove(0) + } + }, + MonitorReloadSelector::LastPending => self.pending_monitors.pop().unwrap_or(old_mon), + }; + self.persisted_monitor_id = monitor_id; + self.persisted_monitor = serialized_monitor; + // After restart, stop tracking pre-restart in-flight writes. ChannelManager will replay + // off-chain monitor updates that still matter; full-monitor snapshots may simply be absent. + self.pending_monitors.clear(); + self.pending_monitor_completions.clear(); + } +} + +struct HarnessPersister { + pub update_ret: Mutex, + pub latest_monitors: Mutex>, +} +impl HarnessPersister { + fn track_monitor_update( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + status: chain::ChannelMonitorUpdateStatus, needs_completion: bool, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + if let Some(state) = latest_monitors.get_mut(&channel_id) { + match status { + chain::ChannelMonitorUpdateStatus::Completed => { + // A completed write advances the restart baseline. Once LDK can rely on that + // monitor state being durable, the harness stops offering candidates at or + // behind that update id. + state.mark_persisted(monitor_id, serialized_monitor); + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // InProgress always creates a restart candidate, but only some calls also need + // an explicit channel_monitor_updated completion. + state.insert_pending(monitor_id, serialized_monitor, needs_completion); + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => {}, + } + } else { + let state = match status { + chain::ChannelMonitorUpdateStatus::Completed => LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: serialized_monitor, + pending_monitors: Vec::new(), + pending_monitor_completions: Vec::new(), + }, + chain::ChannelMonitorUpdateStatus::InProgress => { + // The first persist for a channel is persist_new_channel, which always needs a + // completion callback when it returns InProgress. A full-monitor update without + // existing state would mean the harness missed the channel's initial monitor. + assert!(needs_completion, "missing monitor state for full monitor update"); + LatestMonitorState { + persisted_monitor_id: monitor_id, + persisted_monitor: Vec::new(), + pending_monitors: vec![(monitor_id, serialized_monitor.clone())], + pending_monitor_completions: vec![(monitor_id, serialized_monitor)], + } + }, + chain::ChannelMonitorUpdateStatus::UnrecoverableError => return, + }; + assert!( + latest_monitors.insert(channel_id, state).is_none(), + "Already had monitor state pre-persist" + ); + } + } + + fn mark_update_completed( + &self, channel_id: ChannelId, monitor_id: u64, serialized_monitor: Vec, + ) { + let mut latest_monitors = self.latest_monitors.lock().unwrap(); + let state = latest_monitors + .get_mut(&channel_id) + .expect("missing monitor state for completed update"); + // Once we tell LDK update N is completed, use the completed monitor as the restart + // baseline and drop restart candidates at or behind N. + state.mark_completed_update_persisted(monitor_id, serialized_monitor); + } + + fn drain_pending_updates(&self, channel_id: &ChannelId) -> Vec<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .map_or_else(Vec::new, |state| state.drain_pending_completions()) + } + + fn drain_all_pending_updates(&self) -> Vec<(ChannelId, u64, Vec)> { + let mut completed_updates = Vec::new(); + for (channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() { + for (monitor_id, data) in state.drain_pending_completions() { + completed_updates.push((*channel_id, monitor_id, data)); + } + } + completed_updates + } + + fn take_pending_update( + &self, channel_id: &ChannelId, selector: MonitorUpdateSelector, + ) -> Option<(u64, Vec)> { + self.latest_monitors + .lock() + .unwrap() + .get_mut(channel_id) + .and_then(|state| state.take_pending_completion(selector)) + } +} +impl chainmonitor::Persist for HarnessPersister { + fn persist_new_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = data.get_latest_update_id(); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update(data.channel_id(), monitor_id, serialized_monitor, status, true); + status + } + + fn update_persisted_channel( + &self, _monitor_name: lightning::util::persist::MonitorName, + update: Option<&channelmonitor::ChannelMonitorUpdate>, + data: &channelmonitor::ChannelMonitor, + ) -> chain::ChannelMonitorUpdateStatus { + let status = self.update_ret.lock().unwrap().clone(); + let monitor_id = update.map_or_else(|| data.get_latest_update_id(), |upd| upd.update_id); + let serialized_monitor = serialize_monitor(data); + self.track_monitor_update( + data.channel_id(), + monitor_id, + serialized_monitor, + status, + // `None` normally comes from chain-sync or archive writes, which need no completion + // callback. `update_channel_internal` can also use `None` after `update_monitor` + // fails, but this harness does not model that error-recovery path. + update.is_some(), + ); + status } + + fn archive_persisted_channel(&self, _monitor_name: lightning::util::persist::MonitorName) {} } +type TestChainMonitor = chainmonitor::ChainMonitor< + TestChannelSigner, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + struct KeyProvider { node_secret: SecretKey, rand_bytes_id: atomic::AtomicU32, @@ -654,6 +810,7 @@ struct HarnessNode<'a> { node_id: u8, node: ChanMan<'a>, monitor: Arc, + persister: Arc, keys_manager: Arc, logger: Arc, broadcaster: Arc, @@ -674,26 +831,33 @@ impl<'a> std::ops::Deref for HarnessNode<'a> { } impl<'a> HarnessNode<'a> { - fn build_loggers( + fn build_logger( node_id: u8, out: &Out, - ) -> (Arc, Arc) { - let raw_logger = Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())); - let logger_for_monitor: Arc = raw_logger.clone(); - let logger: Arc = raw_logger; - (logger_for_monitor, logger) + ) -> Arc { + Arc::new(test_logger::TestLogger::new(node_id.to_string(), out.clone())) + } + + fn build_persister(persistence_style: ChannelMonitorUpdateStatus) -> Arc { + Arc::new(HarnessPersister { + update_ret: Mutex::new(persistence_style), + latest_monitors: Mutex::new(new_hash_map()), + }) } fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, - keys_manager: &Arc, logger_for_monitor: Arc, - persistence_style: ChannelMonitorUpdateStatus, + keys_manager: &Arc, logger: Arc, + persister: &Arc, ) -> Arc { - Arc::new(TestChainMonitor::new( + Arc::new(chainmonitor::ChainMonitor::new( + None, Arc::clone(broadcaster), - logger_for_monitor, + logger, Arc::clone(fee_estimator), - Arc::new(TestPersister { update_ret: Mutex::new(persistence_style) }), + Arc::clone(persister), Arc::clone(keys_manager), + keys_manager.get_peer_storage_key(), + false, )) } @@ -702,7 +866,7 @@ impl<'a> HarnessNode<'a> { broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { - let (logger_for_monitor, logger) = Self::build_loggers(node_id, out); + let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, node_id, @@ -713,12 +877,13 @@ impl<'a> HarnessNode<'a> { rand_bytes_id: atomic::AtomicU32::new(0), enforcement_states: Mutex::new(new_hash_map()), }); + let persister = Self::build_persister(persistence_style); let monitor = Self::build_chain_monitor( &broadcaster, &fee_estimator, &keys_manager, - logger_for_monitor, - persistence_style, + Arc::clone(&logger), + &persister, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -741,6 +906,7 @@ impl<'a> HarnessNode<'a> { node_id, node, monitor, + persister, keys_manager, logger, broadcaster, @@ -754,67 +920,31 @@ impl<'a> HarnessNode<'a> { } fn set_persistence_style(&mut self, style: ChannelMonitorUpdateStatus) { + // Store the style for the next reload. The active persister is intentionally not changed + // in place. self.persistence_style = style; } + fn finish_monitor_update(&self, chan_id: ChannelId, monitor_id: u64, data: Vec) { + self.monitor.channel_monitor_updated(chan_id, monitor_id).unwrap(); + self.persister.mark_update_completed(chan_id, monitor_id, data); + } + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } fn complete_all_pending_monitor_updates(&self) { - for (channel_id, state) in self.monitor.latest_monitors.lock().unwrap().iter_mut() { - for (id, data) in state.pending_monitors.drain(..) { - self.monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap(); - if id >= state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + for (channel_id, monitor_id, data) in self.persister.drain_all_pending_updates() { + self.finish_monitor_update(channel_id, monitor_id, data); } } fn complete_monitor_update(&self, chan_id: &ChannelId, selector: MonitorUpdateSelector) { - if let Some(state) = self.monitor.latest_monitors.lock().unwrap().get_mut(chan_id) { - assert!( - state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0), - "updates should be sorted by id" - ); - let update = match selector { - MonitorUpdateSelector::First => { - if state.pending_monitors.is_empty() { - None - } else { - Some(state.pending_monitors.remove(0)) - } - }, - MonitorUpdateSelector::Second => { - if state.pending_monitors.len() > 1 { - Some(state.pending_monitors.remove(1)) - } else { - None - } - }, - MonitorUpdateSelector::Last => state.pending_monitors.pop(), - }; - if let Some((id, data)) = update { - self.monitor.chain_monitor.channel_monitor_updated(*chan_id, id).unwrap(); - if id > state.persisted_monitor_id { - state.persisted_monitor_id = id; - state.persisted_monitor = data; - } - } + if let Some((monitor_id, data)) = self.persister.take_pending_update(chan_id, selector) { + self.finish_monitor_update(*chan_id, monitor_id, data); } } @@ -942,50 +1072,39 @@ impl<'a> HarnessNode<'a> { fn reload( &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { - let (logger_for_monitor, logger) = Self::build_loggers(self.node_id, out); + let logger = Self::build_logger(self.node_id, out); + // Re-registering monitors during reload reflects data that was already selected from + // simulated storage, so these startup watch_channel calls should complete immediately. + let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, &self.keys_manager, - logger_for_monitor, - ChannelMonitorUpdateStatus::Completed, + Arc::clone(&logger), + &persister, ); let mut monitors = new_hash_map(); let mut use_old_mons = use_old_mons; { - let mut old_monitors = self.monitor.latest_monitors.lock().unwrap(); + let mut old_monitors = self.persister.latest_monitors.lock().unwrap(); for (channel_id, mut prev_state) in old_monitors.drain() { - let (mon_id, serialized_mon) = if use_old_mons % 3 == 0 { - // Reload with the oldest `ChannelMonitor` (the one that we already told - // `ChannelManager` we finished persisting). - (prev_state.persisted_monitor_id, prev_state.persisted_monitor) - } else if use_old_mons % 3 == 1 { - // Reload with the second-oldest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.drain(..).next().unwrap_or(old_mon) - } else { - // Reload with the newest `ChannelMonitor`. - let old_mon = (prev_state.persisted_monitor_id, prev_state.persisted_monitor); - prev_state.pending_monitors.pop().unwrap_or(old_mon) + let selector = match use_old_mons % 3 { + 0 => MonitorReloadSelector::Persisted, + 1 => MonitorReloadSelector::FirstPending, + _ => MonitorReloadSelector::LastPending, }; - // Use a different value of `use_old_mons` if we have another monitor - // (only for node B) by shifting `use_old_mons` one in base-3. + prev_state.select_monitor_for_reload(selector); + // Use a different trit for each monitor so one restart byte can vary the stale + // monitor depth across multiple monitors for the node. use_old_mons /= 3; let mon = <(BlockLocator, ChannelMonitor)>::read( - &mut &serialized_mon[..], + &mut &prev_state.persisted_monitor[..], (&*self.keys_manager, &*self.keys_manager), ) .expect("Failed to read monitor"); monitors.insert(channel_id, mon.1); - // Update the latest `ChannelMonitor` state to match what we just told LDK. - prev_state.persisted_monitor = serialized_mon; - prev_state.persisted_monitor_id = mon_id; - // Wipe any `ChannelMonitor`s which we never told LDK we finished persisting, - // considering them discarded. LDK should replay these for us as they're stored in - // the `ChannelManager`. - prev_state.pending_monitors.clear(); - chain_monitor.latest_monitors.lock().unwrap().insert(channel_id, prev_state); + persister.latest_monitors.lock().unwrap().insert(channel_id, prev_state); } } let mut monitor_refs = new_hash_map(); @@ -1011,17 +1130,27 @@ impl<'a> HarnessNode<'a> { .expect("Failed to read manager"); for (channel_id, mon) in monitors.drain() { assert_eq!( - chain_monitor.chain_monitor.watch_channel(channel_id, mon), + chain_monitor.watch_channel(channel_id, mon), Ok(ChannelMonitorUpdateStatus::Completed) ); } - *chain_monitor.persister.update_ret.lock().unwrap() = self.persistence_style; + // Future monitor writes should follow the node's configured persistence style; only the + // startup watch_channel registration above is forced to Completed. + *persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; + self.persister = persister; self.logger = logger; } } +#[derive(Copy, Clone)] +enum MonitorReloadSelector { + Persisted, + FirstPending, + LastPending, +} + #[derive(Copy, Clone)] enum MonitorUpdateSelector { First, @@ -1921,7 +2050,7 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor updates for dest after watch_channel. + // Complete any pending monitor persistence callbacks for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -1942,7 +2071,7 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor updates for source after watch_channel. + // Complete any pending monitor persistence callbacks for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2620,7 +2749,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - // Next, make sure no monitor updates are pending. + // Next, make sure no monitor completion callbacks are pending. self.ab_link.complete_all_monitor_updates(&self.nodes); self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. @@ -3019,18 +3148,18 @@ pub fn do_test(data: &[u8], out: Out) { }, 0xb0 | 0xb1 | 0xb2 => { - // Restart node A, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node A, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(0, v, &router); }, 0xb3..=0xbb => { - // Restart node B, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node B, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(1, v, &router); }, 0xbc | 0xbd | 0xbe => { - // Restart node C, picking among the in-flight `ChannelMonitor`s to use based on - // the value of `v` we're matching. + // Restart node C, picking among persisted and in-flight `ChannelMonitor` + // candidates based on the value of `v` we're matching. harness.restart_node(2, v, &router); }, From 64dfdcb4183ebe118471d7e244238a4c986e1cde Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:52:16 +0200 Subject: [PATCH 02/19] fuzz: keep settling after progress-only passes Treat HTLC-forward processing and monitor completion as real progress in the chanmon harness. This keeps the settle loop running after passes that only unblock follow-up work instead of stopping before the next event or message batch. --- fuzz/src/chanmon_consistency.rs | 39 +++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index aadc58ff4ce..7a003c3bd61 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -930,10 +930,13 @@ impl<'a> HarnessNode<'a> { self.persister.mark_update_completed(chan_id, monitor_id, data); } - fn complete_all_monitor_updates(&self, chan_id: &ChannelId) { - for (monitor_id, data) in self.persister.drain_pending_updates(chan_id) { + fn complete_all_monitor_updates(&self, chan_id: &ChannelId) -> bool { + let completed_updates = self.persister.drain_pending_updates(chan_id); + let completed_any = !completed_updates.is_empty(); + for (monitor_id, data) in completed_updates { self.finish_monitor_update(*chan_id, monitor_id, data); } + completed_any } fn complete_all_pending_monitor_updates(&self) { @@ -966,9 +969,12 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) { + fn refresh_serialized_manager(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { self.serialized_manager = self.node.encode(); + true + } else { + false } } @@ -1362,11 +1368,13 @@ impl PeerLink { || (self.node_a == node_b && self.node_b == node_a) } - fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) { + fn complete_all_monitor_updates(&self, nodes: &[HarnessNode<'_>; 3]) -> bool { + let mut completed_updates = false; for id in &self.channel_ids { - nodes[self.node_a].complete_all_monitor_updates(id); - nodes[self.node_b].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_a].complete_all_monitor_updates(id); + completed_updates |= nodes[self.node_b].complete_all_monitor_updates(id); } + completed_updates } fn complete_monitor_updates_for_node( @@ -2143,7 +2151,6 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; - let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); @@ -2671,7 +2678,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // claim/fail handling per event batch. let mut claim_set = new_hash_map(); let mut events = nodes[node_idx].get_and_clear_pending_events(); - let had_events = !events.is_empty(); + let mut had_events = !events.is_empty(); for event in events.drain(..) { match event { events::Event::PaymentClaimable { payment_hash, .. } => { @@ -2727,6 +2734,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } while nodes[node_idx].needs_pending_htlc_processing() { nodes[node_idx].process_pending_htlc_forwards(); + had_events = true; } had_events } @@ -2749,9 +2757,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } + let mut made_progress = self.refresh_serialized_managers(); // Next, make sure no monitor completion callbacks are pending. - self.ab_link.complete_all_monitor_updates(&self.nodes); - self.bc_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); + made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); // Then, make sure any current forwards make their way to their destination. if self.process_msg_events(0, false, ProcessMessages::AllMessages) { last_pass_no_updates = false; @@ -2778,6 +2787,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { last_pass_no_updates = false; continue; } + if made_progress { + last_pass_no_updates = false; + continue; + } if last_pass_no_updates { // In some cases, we may generate a message to send in // `process_msg_events`, but block sending until @@ -2893,10 +2906,12 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) { + fn refresh_serialized_managers(&mut self) -> bool { + let mut made_progress = false; for node in &mut self.nodes { - node.refresh_serialized_manager(); + made_progress |= node.refresh_serialized_manager(); } + made_progress } } From 2ce533379f04d01d8eafc50d2c2fdeeb319e9b30 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:53:01 +0200 Subject: [PATCH 03/19] fuzz: reload monitors with the configured status Build the replacement persister with the configured monitor update status during reload. This keeps non-deferred restart behavior aligned with the active persistence-style matrix. --- fuzz/src/chanmon_consistency.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 7a003c3bd61..f67a8ab1c31 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -1079,9 +1079,7 @@ impl<'a> HarnessNode<'a> { &mut self, use_old_mons: u8, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) { let logger = Self::build_logger(self.node_id, out); - // Re-registering monitors during reload reflects data that was already selected from - // simulated storage, so these startup watch_channel calls should complete immediately. - let persister = Self::build_persister(ChannelMonitorUpdateStatus::Completed); + let persister = Self::build_persister(self.persistence_style); let chain_monitor = Self::build_chain_monitor( &self.broadcaster, &self.fee_estimator, @@ -1135,14 +1133,8 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); for (channel_id, mon) in monitors.drain() { - assert_eq!( - chain_monitor.watch_channel(channel_id, mon), - Ok(ChannelMonitorUpdateStatus::Completed) - ); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(self.persistence_style)); } - // Future monitor writes should follow the node's configured persistence style; only the - // startup watch_channel registration above is forced to Completed. - *persister.update_ret.lock().unwrap() = self.persistence_style; self.node = manager.1; self.monitor = chain_monitor; self.persister = persister; From bdcdbb68ab359d9ecff3cbbab3ebcf1b7fbda3d0 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 13 May 2026 13:56:10 +0200 Subject: [PATCH 04/19] fuzz: add deferred chanmon checkpoints Track deferred monitor writes in the harness and checkpoint the ChannelManager state before flushing them to the persister. This extends setup, reload, and settle paths to model deferred ChainMonitor persistence ordering. --- fuzz/src/chanmon_consistency.rs | 88 +++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 21 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f67a8ab1c31..dfaa7d97387 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -817,6 +817,7 @@ struct HarnessNode<'a> { fee_estimator: Arc, wallet: TestWalletSource, persistence_style: ChannelMonitorUpdateStatus, + deferred: bool, serialized_manager: Vec, height: u32, last_htlc_clear_fee: u32, @@ -847,7 +848,7 @@ impl<'a> HarnessNode<'a> { fn build_chain_monitor( broadcaster: &Arc, fee_estimator: &Arc, keys_manager: &Arc, logger: Arc, - persister: &Arc, + persister: &Arc, deferred: bool, ) -> Arc { Arc::new(chainmonitor::ChainMonitor::new( None, @@ -857,14 +858,14 @@ impl<'a> HarnessNode<'a> { Arc::clone(persister), Arc::clone(keys_manager), keys_manager.get_peer_storage_key(), - false, + deferred, )) } fn new( node_id: u8, wallet: TestWalletSource, fee_estimator: Arc, broadcaster: Arc, persistence_style: ChannelMonitorUpdateStatus, - out: &Out, router: &'a FuzzRouter, chan_type: ChanType, + deferred: bool, out: &Out, router: &'a FuzzRouter, chan_type: ChanType, ) -> Self { let logger = Self::build_logger(node_id, out); let node_secret = SecretKey::from_slice(&[ @@ -884,6 +885,7 @@ impl<'a> HarnessNode<'a> { &keys_manager, Arc::clone(&logger), &persister, + deferred, ); let network = Network::Bitcoin; let best_block_timestamp = genesis_block(network).header.time; @@ -913,6 +915,7 @@ impl<'a> HarnessNode<'a> { fee_estimator, wallet, persistence_style, + deferred, serialized_manager: Vec::new(), height: 0, last_htlc_clear_fee: 253, @@ -969,15 +972,33 @@ impl<'a> HarnessNode<'a> { } } - fn refresh_serialized_manager(&mut self) -> bool { + fn checkpoint_manager_persistence(&mut self) -> bool { if self.node.get_and_clear_needs_persistence() { + let pending_monitor_writes = self.monitor.pending_operation_count(); self.serialized_manager = self.node.encode(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } true } else { + assert_eq!(self.monitor.pending_operation_count(), 0); false } } + fn force_checkpoint_manager_persistence(&mut self) { + let pending_monitor_writes = self.monitor.pending_operation_count(); + self.serialized_manager = self.node.encode(); + self.node.get_and_clear_needs_persistence(); + if self.deferred { + self.monitor.flush(pending_monitor_writes, &self.logger); + } else { + assert_eq!(pending_monitor_writes, 0); + } + } + fn bump_fee_estimate(&mut self, chan_type: ChanType) { let mut max_feerate = self.last_htlc_clear_fee; if matches!(chan_type, ChanType::Legacy) { @@ -1086,6 +1107,7 @@ impl<'a> HarnessNode<'a> { &self.keys_manager, Arc::clone(&logger), &persister, + self.deferred, ); let mut monitors = new_hash_map(); @@ -1132,13 +1154,22 @@ impl<'a> HarnessNode<'a> { let manager = <(BlockLocator, ChanMan)>::read(&mut &self.serialized_manager[..], read_args) .expect("Failed to read manager"); + let expected_status = if self.deferred { + ChannelMonitorUpdateStatus::InProgress + } else { + self.persistence_style + }; for (channel_id, mon) in monitors.drain() { - assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(self.persistence_style)); + assert_eq!(chain_monitor.watch_channel(channel_id, mon), Ok(expected_status)); } self.node = manager.1; self.monitor = chain_monitor; self.persister = persister; self.logger = logger; + // In deferred mode, the startup watch_channel registrations above queue monitor operations + // even if the reloaded ChannelManager does not need persistence. Always checkpoint here so + // those registrations can be flushed against the manager snapshot they belong to. + self.force_checkpoint_manager_persistence(); } } @@ -1937,9 +1968,12 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { } fn make_channel( - source: &HarnessNode<'_>, dest: &HarnessNode<'_>, chan_id: i32, trusted_open: bool, - trusted_accept: bool, chain_state: &mut ChainState, + nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, + trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, ) { + assert!(source_idx < dest_idx); + let (left, right) = nodes.split_at_mut(dest_idx); + let (source, dest) = (&mut left[source_idx], &mut right[0]); if trusted_open { source .create_channel_to_trusted_peer_0reserve( @@ -2050,7 +2084,8 @@ fn make_channel( } }; dest.handle_funding_created(source.get_our_node_id(), &funding_created); - // Complete any pending monitor persistence callbacks for dest after watch_channel. + dest.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for dest after watch_channel. dest.complete_all_pending_monitor_updates(); let (funding_signed, channel_id) = { @@ -2071,7 +2106,8 @@ fn make_channel( } source.handle_funding_signed(dest.get_our_node_id(), &funding_signed); - // Complete any pending monitor persistence callbacks for source after watch_channel. + source.checkpoint_manager_persistence(); + // Complete any monitor persistence callbacks made available for source after watch_channel. source.complete_all_pending_monitor_updates(); let events = source.get_and_clear_pending_events(); @@ -2143,6 +2179,12 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { ChannelMonitorUpdateStatus::Completed }, ]; + let deferred = [ + config_byte & 0b0010_0000 != 0, + config_byte & 0b0100_0000 != 0, + config_byte & 0b1000_0000 != 0, + ]; + let wallet_a = TestWalletSource::new(SecretKey::from_slice(&[1; 32]).unwrap()); let wallet_b = TestWalletSource::new(SecretKey::from_slice(&[2; 32]).unwrap()); let wallet_c = TestWalletSource::new(SecretKey::from_slice(&[3; 32]).unwrap()); @@ -2179,6 +2221,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_a), Arc::clone(&broadcast_a), persistence_styles[0], + deferred[0], &out, router, chan_type, @@ -2189,6 +2232,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_b), Arc::clone(&broadcast_b), persistence_styles[1], + deferred[1], &out, router, chan_type, @@ -2199,6 +2243,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { Arc::clone(&fee_est_c), Arc::clone(&broadcast_c), persistence_styles[2], + deferred[2], &out, router, chan_type, @@ -2216,14 +2261,14 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&nodes[0], &nodes[1], 1, false, false, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 2, true, true, &mut chain_state); - make_channel(&nodes[0], &nodes[1], 3, false, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state); + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state); + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state); // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&nodes[1], &nodes[2], 4, false, true, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 5, true, false, &mut chain_state); - make_channel(&nodes[1], &nodes[2], 6, false, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state); + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state); + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state); // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2250,7 +2295,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { }; for node in &mut nodes { - node.serialized_manager = node.encode(); + node.force_checkpoint_manager_persistence(); } Self { @@ -2749,7 +2794,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { "It may take may iterations to settle the state, but it should not take forever" ); } - let mut made_progress = self.refresh_serialized_managers(); + let mut made_progress = self.checkpoint_manager_persistences(); // Next, make sure no monitor completion callbacks are pending. made_progress |= self.ab_link.complete_all_monitor_updates(&self.nodes); made_progress |= self.bc_link.complete_all_monitor_updates(&self.nodes); @@ -2898,10 +2943,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { self.nodes[2].record_last_htlc_clear_fee(); } - fn refresh_serialized_managers(&mut self) -> bool { + fn checkpoint_manager_persistences(&mut self) -> bool { let mut made_progress = false; for node in &mut self.nodes { - made_progress |= node.refresh_serialized_manager(); + made_progress |= node.checkpoint_manager_persistence(); } made_progress } @@ -2910,9 +2955,10 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { #[inline] pub fn do_test(data: &[u8], out: Out) { let router = FuzzRouter {}; - // Read initial monitor styles and channel type from fuzz input byte 0: + // Read initial monitor styles, channel type, and deferred write mode from fuzz input byte 0: // bits 0-2: monitor styles (1 bit per node) // bits 3-4: channel type (0=Legacy, 1=KeyedAnchors, 2=ZeroFeeCommitments) + // bits 5-7: deferred monitor write mode (1 bit per node) let config_byte = if !data.is_empty() { data[0] } else { 0 }; let mut harness = Harness::new(config_byte, out, &router); let mut read_pos = 1; // First byte was consumed for initial config. @@ -3324,7 +3370,7 @@ pub fn do_test(data: &[u8], out: Out) { _ => break 'fuzz_loop, } - harness.refresh_serialized_managers(); + harness.checkpoint_manager_persistences(); } harness.finish(); } From 8a0f1dd9a8a0c9ac0993a4c553006e3febfb6a94 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 15:33:44 +0200 Subject: [PATCH 05/19] lightning: introduce singular claim requests Have ChannelMonitor hand singular ClaimRequests to OnchainTxHandler. Convert them to PackageTemplates only after duplicate filtering. This makes the single-outpoint invariant explicit at that boundary. --- lightning/src/chain/channelmonitor.rs | 53 ++++++++++++++------------- lightning/src/chain/onchaintx.rs | 31 ++++++---------- lightning/src/chain/package.rs | 42 +++++++++++++++++++++ 3 files changed, 81 insertions(+), 45 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 42d04e0f8ce..c61883ea1f5 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -38,8 +38,8 @@ use crate::chain::chaininterface::{ }; use crate::chain::onchaintx::{ClaimEvent, FeerateStrategy, OnchainTxHandler}; use crate::chain::package::{ - CounterpartyOfferedHTLCOutput, CounterpartyReceivedHTLCOutput, HolderFundingOutput, - HolderHTLCOutput, PackageSolvingData, PackageTemplate, RevokedHTLCOutput, RevokedOutput, + ClaimRequest, CounterpartyOfferedHTLCOutput, CounterpartyReceivedHTLCOutput, + HolderFundingOutput, HolderHTLCOutput, PackageSolvingData, RevokedHTLCOutput, RevokedOutput, }; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{BlockLocator, WatchedOutput}; @@ -3879,7 +3879,7 @@ impl ChannelMonitorImpl { fn generate_claimable_outpoints_and_watch_outputs( &mut self, generate_monitor_event_with_reason: Option, require_funding_seen: bool, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let funding = get_confirmed_funding_scope!(self); let holder_commitment_tx = &funding.current_holder_commitment_tx; let funding_outp = HolderFundingOutput::build( @@ -3887,7 +3887,7 @@ impl ChannelMonitorImpl { funding.channel_parameters.clone(), ); let funding_outpoint = funding.funding_outpoint(); - let commitment_package = PackageTemplate::build_package( + let commitment_package = ClaimRequest::new( funding_outpoint.txid.clone(), funding_outpoint.index as u32, PackageSolvingData::HolderFundingOutput(funding_outp), self.best_block.height, @@ -3926,9 +3926,9 @@ impl ChannelMonitorImpl { let zero_fee_commitments = self.channel_type_features().supports_anchor_zero_fee_commitments(); if !zero_fee_htlcs && !zero_fee_commitments { - // Because we're broadcasting a commitment transaction, we should construct the package - // assuming it gets confirmed in the next block. Sadly, we have code which considers - // "not yet confirmed" things as discardable, so we cannot do that here. + // Because we're broadcasting a commitment transaction, we should construct claim + // requests assuming it gets confirmed in the next block. Sadly, we have code which + // considers "not yet confirmed" things as discardable, so we cannot do that here. let (mut new_outpoints, _) = self.get_broadcasted_holder_claims( funding, holder_commitment_tx, self.best_block.height, ); @@ -4806,11 +4806,11 @@ impl ChannelMonitorImpl { /// height > height + CLTV_SHARED_CLAIM_BUFFER. In any case, will install monitoring for /// HTLC-Success/HTLC-Timeout transactions. /// - /// Returns packages to claim the revoked output(s) and general information about the output that - /// is to the counterparty in the commitment transaction. + /// Returns claim requests for the revoked output(s) and general information about the output + /// that is to the counterparty in the commitment transaction. #[rustfmt::skip] fn check_spend_counterparty_transaction(&mut self, commitment_txid: Txid, commitment_tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &L) - -> (Vec, CommitmentTxCounterpartyOutputInfo) + -> (Vec, CommitmentTxCounterpartyOutputInfo) { // Most secp and related errors trying to create keys means we have no hope of constructing // a spend transaction...so we return no transactions to broadcast @@ -4850,7 +4850,7 @@ impl ChannelMonitorImpl { per_commitment_point, per_commitment_key, outp.value, funding_spent.channel_parameters.clone(), height, ); - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( commitment_txid, idx as u32, PackageSolvingData::RevokedOutput(revk_outp), height + self.counterparty_commitment_params.on_counterparty_tx_csv as u32, @@ -4879,7 +4879,7 @@ impl ChannelMonitorImpl { } else { height }; - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( commitment_txid, transaction_output_index, PackageSolvingData::RevokedHTLCOutput(revk_htlc_outp), @@ -4968,7 +4968,7 @@ impl ChannelMonitorImpl { commitment_txid: Txid, per_commitment_option: Option<&Vec<(HTLCOutputInCommitment, Option>)>>, confirmation_height: Option, - ) -> Vec { + ) -> Vec { let per_commitment_claimable_data = match per_commitment_option { Some(outputs) => outputs, None => return Vec::new(), @@ -4993,7 +4993,7 @@ impl ChannelMonitorImpl { confirmation_height, ), ); - Some(PackageTemplate::build_package( + Some(ClaimRequest::new( commitment_txid, transaction_output_index, htlc_data, @@ -5009,13 +5009,13 @@ impl ChannelMonitorImpl { .collect() } - /// Returns the HTLC claim package templates and the counterparty output info + /// Returns the HTLC claim requests and the counterparty output info. fn get_counterparty_output_claim_info( &self, funding_spent: &FundingScope, commitment_number: u64, commitment_txid: Txid, tx: &Transaction, per_commitment_claimable_data: &[(HTLCOutputInCommitment, Option>)], confirmation_height: Option, - ) -> (Vec, CommitmentTxCounterpartyOutputInfo) { + ) -> (Vec, CommitmentTxCounterpartyOutputInfo) { let mut claimable_outpoints = Vec::new(); let mut to_counterparty_output_info: CommitmentTxCounterpartyOutputInfo = None; @@ -5086,7 +5086,7 @@ impl ChannelMonitorImpl { ), ) }; - let counterparty_package = PackageTemplate::build_package( + let counterparty_package = ClaimRequest::new( commitment_txid, transaction_output_index, counterparty_htlc_outp, @@ -5104,7 +5104,7 @@ impl ChannelMonitorImpl { #[rustfmt::skip] fn check_spend_counterparty_htlc( &mut self, tx: &Transaction, commitment_number: u64, commitment_txid: &Txid, height: u32, logger: &L - ) -> (Vec, Option) { + ) -> (Vec, Option) { let secret = if let Some(secret) = self.get_secret(commitment_number) { secret } else { return (Vec::new(), None); }; let per_commitment_key = match SecretKey::from_slice(&secret) { Ok(key) => key, @@ -5135,7 +5135,7 @@ impl ChannelMonitorImpl { per_commitment_point, per_commitment_key, tx.output[idx].value, self.funding.channel_parameters.clone(), height, ); - let justice_package = PackageTemplate::build_package( + let justice_package = ClaimRequest::new( htlc_txid, idx as u32, PackageSolvingData::RevokedOutput(revk_outp), height + self.counterparty_commitment_params.on_counterparty_tx_csv as u32, ); @@ -5187,13 +5187,14 @@ impl ChannelMonitorImpl { htlcs } - // Returns (1) `PackageTemplate`s that can be given to the OnchainTxHandler, so that the handler can - // broadcast transactions claiming holder HTLC commitment outputs and (2) a holder revokable - // script so we can detect whether a holder transaction has been seen on-chain. + // Returns (1) `ClaimRequest`s that can be given to the OnchainTxHandler, so that the + // handler can broadcast transactions claiming holder HTLC commitment outputs and (2) a + // holder revokable script so we can detect whether a holder transaction has been seen + // on-chain. #[rustfmt::skip] fn get_broadcasted_holder_claims( &self, funding: &FundingScope, holder_tx: &HolderCommitmentTransaction, conf_height: u32, - ) -> (Vec, Option<(ScriptBuf, PublicKey, RevocationKey)>) { + ) -> (Vec, Option<(ScriptBuf, PublicKey, RevocationKey)>) { let tx = holder_tx.trust(); let keys = tx.keys(); let redeem_script = chan_utils::get_revokeable_redeemscript( @@ -5212,7 +5213,7 @@ impl ChannelMonitorImpl { }; let transaction_output_index = htlc_descriptor.htlc.transaction_output_index .expect("Expected transaction output index for non-dust HTLC"); - PackageTemplate::build_package( + ClaimRequest::new( tx.txid(), transaction_output_index, PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build(htlc_descriptor, conf_height)), counterparty_spendable_height, @@ -5248,7 +5249,7 @@ impl ChannelMonitorImpl { fn check_spend_holder_transaction( &mut self, commitment_txid: Txid, commitment_tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &L, - ) -> Option<(Vec, TransactionOutputs)> { + ) -> Option<(Vec, TransactionOutputs)> { let funding_spent = get_confirmed_funding_scope!(self); // HTLCs set may differ between last and previous holder commitment txn, in case of one them hitting chain, ensure we cancel all HTLCs backward @@ -5759,7 +5760,7 @@ impl ChannelMonitorImpl { conf_hash: BlockHash, txn_matched: Vec<&Transaction>, mut watch_outputs: Vec, - mut claimable_outpoints: Vec, + mut claimable_outpoints: Vec, broadcaster: &B, fee_estimator: &LowerBoundedFeeEstimator, logger: &WithContext, diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index 3eb6d64f3a2..823b81936ce 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -27,7 +27,7 @@ use crate::chain::chaininterface::{ BroadcasterInterface, FeeEstimator, LowerBoundedFeeEstimator, TransactionType, }; use crate::chain::channelmonitor::ANTI_REORG_DELAY; -use crate::chain::package::{PackageSolvingData, PackageTemplate}; +use crate::chain::package::{ClaimRequest, PackageSolvingData, PackageTemplate}; use crate::chain::transaction::MaybeSignedTransaction; use crate::chain::ClaimId; use crate::ln::chan_utils::{ @@ -791,7 +791,7 @@ impl OnchainTxHandler { /// `cur_height`, however it must never be higher than `cur_height`. #[rustfmt::skip] pub(super) fn update_claims_view_from_requests( - &mut self, mut requests: Vec, conf_height: u32, cur_height: u32, + &mut self, mut requests: Vec, conf_height: u32, cur_height: u32, broadcaster: &B, conf_target: ConfirmationTarget, destination_script: &Script, fee_estimator: &LowerBoundedFeeEstimator, logger: &L, ) { @@ -801,33 +801,26 @@ impl OnchainTxHandler { // First drop any duplicate claims. requests.retain(|req| { - debug_assert_eq!( - req.outpoints().len(), - 1, - "Claims passed to `update_claims_view_from_requests` should not be aggregated" - ); - let mut all_outpoints_claiming = true; - for outpoint in req.outpoints() { - if self.claimable_outpoints.get(outpoint).is_none() { - all_outpoints_claiming = false; - } - } - if all_outpoints_claiming { + let outpoint = req.outpoint(); + if self.claimable_outpoints.get(outpoint).is_some() { log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", - req.outpoints()[0].txid, req.outpoints()[0].vout); + outpoint.txid, outpoint.vout); false } else { let timelocked_equivalent_package = self.locktimed_packages.iter().map(|v| v.1.iter()).flatten() - .find(|locked_package| locked_package.outpoints() == req.outpoints()); + .find(|locked_package| locked_package.outpoints().len() == 1 && locked_package.contains_outpoint(outpoint)); if let Some(package) = timelocked_equivalent_package { log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", - req.outpoints()[0].txid, req.outpoints()[0].vout, package.package_locktime(cur_height)); + outpoint.txid, outpoint.vout, package.package_locktime(cur_height)); false } else { true } } }); + let mut requests = requests.into_iter() + .map(ClaimRequest::into_package_template) + .collect::>(); // Then try to maximally aggregate `requests`. for i in (1..requests.len()).rev() { @@ -1290,7 +1283,7 @@ mod tests { use types::features::ChannelTypeFeatures; use crate::chain::chaininterface::{ConfirmationTarget, LowerBoundedFeeEstimator}; - use crate::chain::package::{HolderHTLCOutput, PackageSolvingData, PackageTemplate}; + use crate::chain::package::{ClaimRequest, HolderHTLCOutput, PackageSolvingData}; use crate::chain::transaction::OutPoint; use crate::ln::chan_utils::{ ChannelPublicKeys, ChannelTransactionParameters, CounterpartyChannelTransactionParameters, @@ -1412,7 +1405,7 @@ mod tests { let holder_commit_txid = holder_commit.trust().txid(); let mut requests = Vec::new(); for (htlc, counterparty_sig) in holder_commit.nondust_htlcs().iter().zip(holder_commit.counterparty_htlc_sigs.iter()) { - requests.push(PackageTemplate::build_package( + requests.push(ClaimRequest::new( holder_commit_txid, htlc.transaction_output_index.unwrap(), PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build(HTLCDescriptor { diff --git a/lightning/src/chain/package.rs b/lightning/src/chain/package.rs index 0ef8855242b..06be5750367 100644 --- a/lightning/src/chain/package.rs +++ b/lightning/src/chain/package.rs @@ -1097,6 +1097,19 @@ enum PackageMalleability { Untractable, } +/// A single on-chain output claim generated by [`ChannelMonitor`]. +/// +/// These requests are converted to [`PackageTemplate`]s once [`OnchainTxHandler`] has deduplicated +/// them and is ready to aggregate compatible claims. +/// +/// [`ChannelMonitor`]: crate::chain::channelmonitor::ChannelMonitor +/// [`OnchainTxHandler`]: crate::chain::onchaintx::OnchainTxHandler +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct ClaimRequest { + input: (BitcoinOutPoint, PackageSolvingData), + counterparty_spendable_height: u32, +} + /// A structure to describe a package content that is generated by ChannelMonitor and /// used by OnchainTxHandler to generate and broadcast transactions settling onchain claims. /// @@ -1179,6 +1192,32 @@ impl PartialEq for PackageTemplate { } } +impl ClaimRequest { + pub(crate) fn new( + txid: Txid, vout: u32, input_solving_data: PackageSolvingData, + counterparty_spendable_height: u32, + ) -> Self { + Self { + input: (BitcoinOutPoint { txid, vout }, input_solving_data), + counterparty_spendable_height, + } + } + + pub(crate) fn outpoint(&self) -> &BitcoinOutPoint { + &self.input.0 + } + + pub(crate) fn into_package_template(self) -> PackageTemplate { + let (outpoint, input_solving_data) = self.input; + PackageTemplate::build_package( + outpoint.txid, + outpoint.vout, + input_solving_data, + self.counterparty_spendable_height, + ) + } +} + impl PackageTemplate { #[rustfmt::skip] pub(crate) fn can_merge_with(&self, other: &PackageTemplate, cur_height: u32) -> bool { @@ -1265,6 +1304,9 @@ impl PackageTemplate { pub(crate) fn outpoints(&self) -> Vec<&BitcoinOutPoint> { self.inputs.iter().map(|(o, _)| o).collect() } + pub(crate) fn contains_outpoint(&self, outpoint: &BitcoinOutPoint) -> bool { + self.inputs.iter().any(|(input, _)| input == outpoint) + } pub(crate) fn outpoints_and_creation_heights( &self, ) -> impl Iterator)> { From 27837dee6e6aa230d61c47dc23be02dea8f9072b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 4 May 2026 10:36:53 +0200 Subject: [PATCH 06/19] lightning: clarify channelmonitor event thresholds Clarify ChannelMonitor comments around on-chain event thresholds. Some events only wait for anti-reorg finality, while CSV-delayed outputs wait until spendable through the same threshold queue. --- lightning/src/chain/channelmonitor.rs | 51 ++++++++++++++------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index c61883ea1f5..67dff0f3e80 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -473,7 +473,8 @@ impl Readable for CounterpartyCommitmentParameters { /// An entry for an [`OnchainEvent`], stating the block height and hash when the event was /// observed, as well as the transaction causing it. /// -/// Used to determine when the on-chain event can be considered safe from a chain reorganization. +/// Used to determine when the on-chain event can be considered safe from a chain reorganization +/// or, for CSV-delayed outputs, spendable. #[derive(Clone, PartialEq, Eq)] struct OnchainEventEntry { txid: Txid, @@ -491,14 +492,14 @@ impl OnchainEventEntry { OnchainEvent::MaturingOutput { descriptor: SpendableOutputDescriptor::DelayedPaymentOutput(ref descriptor) } => { - // A CSV'd transaction is confirmable in block (input height) + CSV delay, which means - // it's broadcastable when we see the previous block. + // A CSV-delayed output is spendable in block (input height) + CSV delay, which + // means we can hand it upstream when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + descriptor.to_self_delay as u32 - 1); }, OnchainEvent::FundingSpendConfirmation { on_local_output_csv: Some(csv), .. } | OnchainEvent::HTLCSpendConfirmation { on_to_local_output_csv: Some(csv), .. } => { - // A CSV'd transaction is confirmable in block (input height) + CSV delay, which means - // it's broadcastable when we see the previous block. + // A CSV-delayed output is spendable in block (input height) + CSV delay, which + // means we can act on the event when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + csv as u32 - 1); }, _ => {}, @@ -517,7 +518,7 @@ impl OnchainEventEntry { type CommitmentTxCounterpartyOutputInfo = Option<(u32, Amount)>; /// Upon discovering of some classes of onchain tx by ChannelMonitor, we may have to take actions on it -/// once they mature to enough confirmations (ANTI_REORG_DELAY) +/// once they reach anti-reorg finality or, for CSV-delayed outputs, CSV maturity. #[derive(Clone, PartialEq, Eq)] enum OnchainEvent { /// An outbound HTLC failing after a transaction is confirmed. Used @@ -534,8 +535,8 @@ enum OnchainEvent { /// transaction which appeared on chain. commitment_tx_output_idx: Option, }, - /// An output waiting on [`ANTI_REORG_DELAY`] confirmations before we hand the user the - /// [`SpendableOutputDescriptor`]. + /// An output waiting until it is anti-reorg final and, for CSV-delayed outputs, spendable + /// before we hand the user the [`SpendableOutputDescriptor`]. MaturingOutput { descriptor: SpendableOutputDescriptor }, /// A spend of the funding output, either a commitment transaction or a cooperative closing /// transaction. @@ -566,8 +567,8 @@ enum OnchainEvent { /// If the claim was made by either party with a preimage, this is filled in preimage: Option, /// If the claim was made by us on an inbound HTLC against a local commitment transaction, - /// we set this to the output CSV value which we will have to wait until to spend the - /// output (and generate a SpendableOutput event). + /// this records the CSV delay for the delayed output. While present, the event reaches + /// its threshold once the output is spendable. on_to_local_output_csv: Option, }, /// An alternative funding transaction (due to a splice/RBF) has confirmed but can no longer be @@ -1003,7 +1004,7 @@ impl Balance { } } -/// An HTLC which has been irrevocably resolved on-chain, and has reached ANTI_REORG_DELAY. +/// An HTLC whose on-chain outcome has reached the threshold for irrevocable resolution. #[derive(Clone, PartialEq, Eq)] struct IrrevocablyResolvedHTLC { commitment_tx_output_idx: Option, @@ -1301,8 +1302,9 @@ pub(crate) struct ChannelMonitorImpl { pub(super) is_processing_pending_events: bool, // Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on - // which to take actions once they reach enough confirmations. Each entry includes the - // transaction's id and the height when the transaction was confirmed on chain. + // which to take actions once they reach anti-reorg finality or, for CSV-delayed outputs, + // CSV maturity. Each entry includes the transaction's id and the height when the transaction + // was confirmed on chain. onchain_events_awaiting_threshold_conf: Vec, // If we get serialized out and re-read, we need to make sure that the chain monitoring @@ -1339,8 +1341,8 @@ pub(crate) struct ChannelMonitorImpl { /// Added in 0.0.124. holder_pays_commitment_tx_fee: Option, - /// Set to `Some` of the confirmed transaction spending the funding input of the channel after - /// reaching `ANTI_REORG_DELAY` confirmations. + /// Set to `Some` once the confirmed transaction spending the funding input of the channel has + /// reached its event threshold. funding_spend_confirmed: Option, confirmed_commitment_tx_counterparty_output: CommitmentTxCounterpartyOutputInfo, @@ -2763,11 +2765,10 @@ impl ChannelMonitorImpl { source: BalanceSource::Htlc, }); } else if htlc_resolved && !htlc_output_spend_pending { - // Funding transaction spends should be fully confirmed by the time any - // HTLC transactions are resolved, unless we're talking about a holder - // commitment tx, whose resolution is delayed until the CSV timeout is - // reached, even though HTLCs may be resolved after only - // ANTI_REORG_DELAY confirmations. + // Funding transaction spends should have reached their event threshold by the time any + // HTLC transactions are irrevocably resolved, unless we're talking about a holder + // commitment tx, whose resolution is delayed until CSV maturity, even though HTLCs + // may be resolved after anti-reorg finality. debug_assert!(holder_commitment || self.funding_spend_confirmed.is_some()); } else if counterparty_revoked_commitment { let htlc_output_claim_pending = self.onchain_events_awaiting_threshold_conf.iter().any(|event| { @@ -2889,7 +2890,7 @@ impl ChannelMonitor { }); if let Some((txid, conf_thresh)) = funding_spend_pending { debug_assert!(us.funding_spend_confirmed.is_none(), - "We have a pending funding spend awaiting anti-reorg confirmation, we can't have confirmed it already!"); + "We have a pending funding spend awaiting its event threshold, it cannot have reached it already!"); confirmed_txid = Some(txid); pending_commitment_tx_conf_thresh = Some(conf_thresh); } @@ -3347,7 +3348,7 @@ macro_rules! fail_unbroadcast_htlcs { commitment_tx_output_idx: None, }, }; - log_trace!($logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of {} commitment transaction {}, waiting for confirmation (at height {})", + log_trace!($logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of {} commitment transaction {}, event reaches threshold at height {}", &htlc.payment_hash, $commitment_tx, $commitment_tx_type, $commitment_txid_confirmed, entry.confirmation_threshold()); $self.onchain_events_awaiting_threshold_conf.push(entry); @@ -4513,7 +4514,7 @@ impl ChannelMonitorImpl { // event for the same source. self.failed_back_htlc_ids.insert(SentHTLCId::from_source(source)); if let Some(confirmed_txid) = self.funding_spend_confirmed { - // Funding spend already confirmed past ANTI_REORG_DELAY: resolve immediately. + // Funding spend already reached its event threshold: resolve immediately. log_trace!( logger, "Failing HTLC from late counterparty commitment update immediately \ @@ -4549,7 +4550,7 @@ impl ChannelMonitorImpl { log_trace!( logger, "Failing HTLC from late counterparty commitment update, \ - waiting for confirmation (at height {})", + event reaches threshold at height {}", entry.confirmation_threshold() ); self.onchain_events_awaiting_threshold_conf.push(entry); @@ -6403,7 +6404,7 @@ impl ChannelMonitorImpl { commitment_tx_output_idx: Some(input.previous_output.vout), }, }; - log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height {})", &payment_hash, entry.confirmation_threshold()); + log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, event reaches threshold at height {}", &payment_hash, entry.confirmation_threshold()); self.onchain_events_awaiting_threshold_conf.push(entry); } } From 977852f0256bdc4e707049a7fc9bf8ef2a20d8c7 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 16:02:00 +0200 Subject: [PATCH 07/19] lightning: refactor onchain tx handler tests Move repeated OnchainTxHandler setup into shared test helpers so the claim-replay coverage can focus on the behavior under test. --- lightning/src/chain/onchaintx.rs | 111 +++++++++++++++++-------------- 1 file changed, 62 insertions(+), 49 deletions(-) diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index 823b81936ce..e559f093922 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -1298,12 +1298,9 @@ mod tests { use super::OnchainTxHandler; - // Test that all claims with locktime equal to or less than the current height are broadcast - // immediately while claims with locktime greater than the current height are only broadcast - // once the locktime is reached. - #[test] - #[rustfmt::skip] - fn test_broadcast_height() { + fn new_test_tx_handler( + channel_type_features: ChannelTypeFeatures, nondust_htlcs: Vec, + ) -> OnchainTxHandler { let secp_ctx = Secp256k1::new(); let signer = InMemorySigner::new( SecretKey::from_slice(&[41; 32]).unwrap(), @@ -1340,9 +1337,6 @@ mod tests { )), }; let funding_outpoint = OutPoint { txid: Txid::all_zeros(), index: u16::MAX }; - - // Use non-anchor channels so that HTLC-Timeouts are broadcast immediately instead of sent - // to the user for external funding. let chan_params = ChannelTransactionParameters { holder_pubkeys: signer.pubkeys(&secp_ctx), holder_selected_contest_delay: 66, @@ -1353,66 +1347,45 @@ mod tests { }), funding_outpoint: Some(funding_outpoint), splice_parent_funding_txid: None, - channel_type_features: ChannelTypeFeatures::only_static_remote_key(), + channel_type_features, channel_value_satoshis: 0, }; - - // Create an OnchainTxHandler for a commitment containing HTLCs with CLTV expiries of 0, 1, - // and 2 blocks. - let mut nondust_htlcs = Vec::new(); - for i in 0..3 { - let preimage = PaymentPreimage([i; 32]); - let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); - nondust_htlcs.push( - HTLCOutputInCommitment { - offered: true, - amount_msat: 10000, - cltv_expiry: i as u32, - payment_hash: hash, - transaction_output_index: Some(i as u32), - } - ); - } - let holder_commit = HolderCommitmentTransaction::dummy(1000000, funding_outpoint, nondust_htlcs); - let destination_script = ScriptBuf::new(); + let holder_commit = + HolderCommitmentTransaction::dummy(1000000, funding_outpoint, nondust_htlcs); let counterparty_node_id = PublicKey::from_slice(&[2; 33]).unwrap(); - let mut tx_handler = OnchainTxHandler::new( + OnchainTxHandler::new( ChannelId::from_bytes([0; 32]), counterparty_node_id, 1000000, [0; 32], - destination_script.clone(), + ScriptBuf::new(), signer, chan_params, holder_commit, secp_ctx, - ); - - // Create a broadcaster with current block height 1. - let broadcaster = TestBroadcaster::new(Network::Testnet); - { - let mut blocks = broadcaster.blocks.lock().unwrap(); - let genesis_hash = blocks[0].0.block_hash(); - blocks.push((create_dummy_block(genesis_hash, 0, Vec::new()), 1)); - } - - let fee_estimator = TestFeeEstimator::new(253); - let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); - let logger = TestLogger::new(); + ) + } - // Request claiming of each HTLC on the holder's commitment, with current block height 1. + fn build_offered_holder_htlc_requests( + tx_handler: &OnchainTxHandler, + ) -> Vec { let holder_commit = tx_handler.current_holder_commitment_tx(); let holder_commit_txid = holder_commit.trust().txid(); let mut requests = Vec::new(); - for (htlc, counterparty_sig) in holder_commit.nondust_htlcs().iter().zip(holder_commit.counterparty_htlc_sigs.iter()) { + for (htlc, counterparty_sig) in + holder_commit.nondust_htlcs().iter().zip(holder_commit.counterparty_htlc_sigs.iter()) + { requests.push(ClaimRequest::new( holder_commit_txid, htlc.transaction_output_index.unwrap(), - PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build(HTLCDescriptor { + PackageSolvingData::HolderHTLCOutput(HolderHTLCOutput::build( + HTLCDescriptor { channel_derivation_parameters: ChannelDerivationParameters { value_satoshis: tx_handler.channel_value_satoshis, keys_id: tx_handler.channel_keys_id, - transaction_parameters: tx_handler.channel_transaction_parameters.clone(), + transaction_parameters: tx_handler + .channel_transaction_parameters + .clone(), }, commitment_txid: holder_commit_txid, per_commitment_number: holder_commit.commitment_number(), @@ -1422,11 +1395,51 @@ mod tests { preimage: None, counterparty_sig: *counterparty_sig, }, - 0 + 0, )), 0, )); } + requests + } + + // Test that all claims with locktime equal to or less than the current height are broadcast + // immediately while claims with locktime greater than the current height are only broadcast + // once the locktime is reached. + #[test] + fn test_broadcast_height() { + // Create an OnchainTxHandler for a commitment containing HTLCs with CLTV expiries of 0, 1, + // and 2 blocks. + let mut nondust_htlcs = Vec::new(); + for i in 0..3 { + let preimage = PaymentPreimage([i; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: i as u32, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + let destination_script = ScriptBuf::new(); + let mut tx_handler = + new_test_tx_handler(ChannelTypeFeatures::only_static_remote_key(), nondust_htlcs); + + // Create a broadcaster with current block height 1. + let broadcaster = TestBroadcaster::new(Network::Testnet); + { + let mut blocks = broadcaster.blocks.lock().unwrap(); + let genesis_hash = blocks[0].0.block_hash(); + blocks.push((create_dummy_block(genesis_hash, 0, Vec::new()), 1)); + } + + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Request claiming of each HTLC on the holder's commitment, with current block height 1. + let requests = build_offered_holder_htlc_requests(&tx_handler); tx_handler.update_claims_view_from_requests( requests, 1, From e81d949241b3c879b7cc5507e391cdec42bc8a6a Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 4 May 2026 11:10:15 +0200 Subject: [PATCH 08/19] lightning: cover delayed preimage claim balance Add a monitor test for an inbound HTLC claimed by preimage from a holder commitment. Confirm that the claimable balance remains unchanged after the HTLC-success spend reaches anti-reorg finality but before the CSV-delayed output is spendable. --- lightning/src/ln/monitor_tests.rs | 67 +++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index f52f093917b..4fd40df3a45 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -115,6 +115,73 @@ fn test_spendable_output<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, spendable_t } else { panic!(); } } +#[test] +fn preimage_claim_balance_unchanged_between_anti_reorg_and_csv() { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let legacy_cfg = test_legacy_channel_config(); + let node_chanmgrs = + create_node_chanmgrs(2, &node_cfgs, &[Some(legacy_cfg.clone()), Some(legacy_cfg)]); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000); + // Route an inbound HTLC to node 0 so its preimage claim spends an HTLC output from node 0's + // holder commitment and creates a CSV-delayed output. + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[1], nodes[0], 12_000_000); + nodes[1].node.send_payment_with_route(route, payment_hash, + RecipientOnionFields::secret_only(payment_secret, 12_000_000), PaymentId(payment_hash.0)).unwrap(); + check_added_monitors(&nodes[1], 1); + let updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + nodes[0].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); + expect_and_process_pending_htlcs(&nodes[0], false); + expect_payment_claimable!(nodes[0], payment_hash, payment_secret, 12_000_000); + + // Confirm node 0's holder commitment before claiming the HTLC so the preimage claim has a + // delayed output that remains tracked as an HTLC balance until it becomes spendable. + let message = "Channel force-closed".to_owned(); + nodes[0].node.force_close_broadcasting_latest_txn(&chan_id, &nodes[1].node.get_our_node_id(), message.clone()).unwrap(); + check_added_monitors(&nodes[0], 1); + check_closed_broadcast(&nodes[0], 1, true); + let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message }; + check_closed_event(&nodes[0], 1, reason, &[nodes[1].node.get_our_node_id()], 1000000); + let commitment_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(commitment_txn.len(), 1); + check_spends!(commitment_txn[0], funding_tx); + mine_transaction(&nodes[0], &commitment_txn[0]); + nodes[0].tx_broadcaster.clear(); + + // Claiming the HTLC with the preimage broadcasts the HTLC-Success transaction. Once it + // confirms, the resulting delayed output should be reported as an HTLC balance awaiting + // confirmations. + nodes[0].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[0], 1); + expect_payment_claimed!(nodes[0], payment_hash, 12_000_000); + let htlc_claim_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(htlc_claim_txn.len(), 1); + check_spends!(htlc_claim_txn[0], commitment_txn[0]); + mine_transaction(&nodes[0], &htlc_claim_txn[0]); + + let htlc_claim_balances = sorted_vec(nodes[0].chain_monitor.chain_monitor + .get_monitor(chan_id).unwrap().get_claimable_balances()); + assert!(htlc_claim_balances.iter().any(|balance| matches!(balance, + Balance::ClaimableAwaitingConfirmations { + amount_satoshis: 12_000, + source: BalanceSource::Htlc, + .. + } + ))); + + // Advance only to anti-reorg finality for the HTLC-Success transaction. The CSV-delayed + // output is not spendable yet, so the claimable HTLC balance should remain unchanged. + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + assert_eq!(htlc_claim_balances, sorted_vec(nodes[0].chain_monitor.chain_monitor + .get_monitor(chan_id).unwrap().get_claimable_balances())); +} + #[test] fn revoked_output_htlc_resolution_timing() { // Tests that HTLCs which were present in a broadcasted remote revoked commitment transaction From a6213de7fdf832a9c6767ecc5b5fa67fc63979ca Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 4 May 2026 09:45:31 +0200 Subject: [PATCH 09/19] lightning: resolve HTLC spends at anti-reorg finality Treat HTLCSpendConfirmation entries as irrevocably resolved once the commitment HTLC output spend reaches anti-reorg finality. Do not wait for CSV maturity of any delayed output created by that spend. Delayed outputs remain tracked separately as MaturingOutput entries, keeping claimable balances alive until they are CSV-mature and can be surfaced as SpendableOutputs. --- lightning/src/chain/channelmonitor.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 67dff0f3e80..09b5466c378 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -496,8 +496,7 @@ impl OnchainEventEntry { // means we can hand it upstream when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + descriptor.to_self_delay as u32 - 1); }, - OnchainEvent::FundingSpendConfirmation { on_local_output_csv: Some(csv), .. } | - OnchainEvent::HTLCSpendConfirmation { on_to_local_output_csv: Some(csv), .. } => { + OnchainEvent::FundingSpendConfirmation { on_local_output_csv: Some(csv), .. } => { // A CSV-delayed output is spendable in block (input height) + CSV delay, which // means we can act on the event when we see the previous block. conf_threshold = cmp::max(conf_threshold, self.height + csv as u32 - 1); @@ -567,8 +566,9 @@ enum OnchainEvent { /// If the claim was made by either party with a preimage, this is filled in preimage: Option, /// If the claim was made by us on an inbound HTLC against a local commitment transaction, - /// this records the CSV delay for the delayed output. While present, the event reaches - /// its threshold once the output is spendable. + /// this records the CSV delay for the delayed output. The CSV-mature output remains + /// tracked via the corresponding [`OnchainEvent::MaturingOutput`]; the HTLC spend itself + /// reaches anti-reorg finality. on_to_local_output_csv: Option, }, /// An alternative funding transaction (due to a splice/RBF) has confirmed but can no longer be @@ -1346,9 +1346,10 @@ pub(crate) struct ChannelMonitorImpl { funding_spend_confirmed: Option, confirmed_commitment_tx_counterparty_output: CommitmentTxCounterpartyOutputInfo, - /// The set of HTLCs which have been either claimed or failed on chain and have reached - /// the requisite confirmations on the claim/fail transaction (either ANTI_REORG_DELAY or the - /// spending CSV for revocable outputs). + /// The set of HTLCs whose on-chain claim or fail outcome is irrevocably resolved because the + /// commitment transaction HTLC output spend has reached anti-reorg finality. Any resulting + /// output that is still waiting on CSV maturity is tracked separately as an + /// [`OnchainEvent::MaturingOutput`]. htlcs_resolved_on_chain: Vec, /// When a payment is resolved through an on-chain transaction, we tell the `ChannelManager` @@ -6298,10 +6299,9 @@ impl ChannelMonitorImpl { commitment_tx_output_idx: input.previous_output.vout, preimage: if accepted_preimage_claim || offered_preimage_claim { Some(payment_preimage) } else { None }, - // If this is a payment to us (ie !outbound_htlc), wait for - // the CSV delay before dropping the HTLC from claimable - // balance if the claim was an HTLC-Success transaction (ie - // accepted_preimage_claim). + // If this is a payment to us (ie !outbound_htlc), keep a + // record of the CSV delay. The delayed output is tracked + // separately as a MaturingOutput until it is spendable. on_to_local_output_csv: if accepted_preimage_claim && !outbound_htlc { Some(self.on_holder_tx_csv) } else { None }, }, From ffc6fec009291b16a95eb0be8874f86135c3d53b Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 6 May 2026 17:05:12 +0200 Subject: [PATCH 10/19] f: assert delayed output for HTLC spends Check that any HTLCSpendConfirmation carrying a local-output CSV has a matching delayed MaturingOutput. Scan spendable outputs before recording HTLC spend confirmations so the invariant is present when the assertion runs. --- lightning/src/chain/channelmonitor.rs | 32 ++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 09b5466c378..09aa863b51c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -5727,9 +5727,9 @@ impl ChannelMonitorImpl { break; } } - self.is_resolving_htlc_output(&tx, height, &block_hash, logger); - self.check_tx_and_push_spendable_outputs(&tx, height, &block_hash, logger); + + self.is_resolving_htlc_output(&tx, height, &block_hash, logger); } } @@ -6207,6 +6207,7 @@ impl ChannelMonitorImpl { &mut self, tx: &Transaction, height: u32, block_hash: &BlockHash, logger: &WithContext, ) { let funding_spent = get_confirmed_funding_scope!(self); + let txid = tx.compute_txid(); 'outer_loop: for input in &tx.input { let mut payment_data = None; @@ -6293,8 +6294,17 @@ impl ChannelMonitorImpl { if payment_data.is_none() { log_claim!($tx_info, $holder_tx, htlc_output, false); let outbound_htlc = $holder_tx == htlc_output.offered; + let on_to_local_output_csv = if accepted_preimage_claim && !outbound_htlc { + Some(self.on_holder_tx_csv) } else { None }; + #[cfg(debug_assertions)] + if let Some(csv) = on_to_local_output_csv { + debug_assert!( + self.has_delayed_maturing_output_for_tx(txid, csv), + "CSV-delayed HTLC spend confirmation should have a matching MaturingOutput" + ); + } self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry { - txid: tx.compute_txid(), height, block_hash: Some(*block_hash), transaction: Some(tx.clone()), + txid, height, block_hash: Some(*block_hash), transaction: Some(tx.clone()), event: OnchainEvent::HTLCSpendConfirmation { commitment_tx_output_idx: input.previous_output.vout, preimage: if accepted_preimage_claim || offered_preimage_claim { @@ -6302,8 +6312,7 @@ impl ChannelMonitorImpl { // If this is a payment to us (ie !outbound_htlc), keep a // record of the CSV delay. The delayed output is tracked // separately as a MaturingOutput until it is spendable. - on_to_local_output_csv: if accepted_preimage_claim && !outbound_htlc { - Some(self.on_holder_tx_csv) } else { None }, + on_to_local_output_csv, }, }); continue 'outer_loop; @@ -6456,6 +6465,19 @@ impl ChannelMonitorImpl { spendable_outputs } + #[cfg(debug_assertions)] + fn has_delayed_maturing_output_for_tx(&self, txid: Txid, csv: u16) -> bool { + self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { + entry.txid == txid + && match &entry.event { + OnchainEvent::MaturingOutput { + descriptor: SpendableOutputDescriptor::DelayedPaymentOutput(descriptor), + } => descriptor.to_self_delay == csv, + _ => false, + } + }) + } + /// Checks if the confirmed transaction is paying funds back to some address we can assume to /// own. #[rustfmt::skip] From 426f97d1e0511c560a25aa88e4750fbfa7b490bd Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 14:21:04 +0200 Subject: [PATCH 11/19] lightning: dedupe delayed claims by outpoint coverage A replayed holder HTLC claim may arrive as a single-outpoint request after earlier requests were merged into a delayed package. Check whether an existing delayed package already covers the new request instead of requiring exact outpoint-set equality. Add focused OnchainTxHandler coverage and a ChannelMonitor regression through claim_funds for both current anchor variants. --- lightning/src/chain/onchaintx.rs | 95 ++++++++++++++++++++++++- lightning/src/ln/monitor_tests.rs | 113 ++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 3 deletions(-) diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index e559f093922..2cdad08de1e 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -807,9 +807,10 @@ impl OnchainTxHandler { outpoint.txid, outpoint.vout); false } else { - let timelocked_equivalent_package = self.locktimed_packages.iter().map(|v| v.1.iter()).flatten() - .find(|locked_package| locked_package.outpoints().len() == 1 && locked_package.contains_outpoint(outpoint)); - if let Some(package) = timelocked_equivalent_package { + let timelocked_covering_package = self.locktimed_packages.values() + .flat_map(|packages| packages.iter()) + .find(|locked_package| locked_package.contains_outpoint(outpoint)); + if let Some(package) = timelocked_covering_package { log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", outpoint.txid, outpoint.vout, package.package_locktime(cur_height)); false @@ -1480,4 +1481,92 @@ mod tests { assert_eq!(txs_broadcasted.len(), 1); assert_eq!(txs_broadcasted[0].lock_time.to_consensus_u32(), 2); } + + #[test] + fn test_duplicate_pending_claim_request_after_force_close_replay() { + let claim_height = 21; + let locktime = 42; + let mut nondust_htlcs = Vec::new(); + for i in 0..2 { + let preimage = PaymentPreimage([i + 1; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: locktime, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + + let mut tx_handler = new_test_tx_handler( + ChannelTypeFeatures::anchors_zero_htlc_fee_and_dependencies(), + nondust_htlcs, + ); + let requests = build_offered_holder_htlc_requests(&tx_handler); + let destination_script = ScriptBuf::new(); + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Simulate the force-close path registering the two holder HTLC claims as + // a single delayed package. + tx_handler.update_claims_view_from_requests( + requests.clone(), + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!( + tx_handler.locktimed_packages.get(&locktime).map(|packages| packages.len()), + Some(1), + ); + + // Replaying the same per-HTLC claim requests must match by outpoint + // coverage, otherwise each single-outpoint request would be added again. + tx_handler.update_claims_view_from_requests( + requests, + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!( + tx_handler.locktimed_packages.get(&locktime).map(|packages| packages.len()), + Some(1), + ); + + // At locktime, the delayed package should still yield one bump event + // covering both HTLCs. + tx_handler.update_claims_view_from_requests( + Vec::new(), + locktime, + locktime, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + + let pending_events = tx_handler.get_and_clear_pending_claim_events(); + assert_eq!(pending_events.len(), 1); + assert_eq!(tx_handler.pending_claim_requests.len(), 1); + assert_eq!(tx_handler.claimable_outpoints.len(), 2); + match &pending_events[0].1 { + super::ClaimEvent::BumpHTLC { htlcs, tx_lock_time, .. } => { + assert_eq!(htlcs.len(), 2); + assert_eq!(tx_lock_time.to_consensus_u32(), locktime); + }, + _ => panic!("expected a single HTLC bump event"), + } + } } diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 4fd40df3a45..436bb01c907 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -2455,6 +2455,119 @@ fn test_restored_packages_retry() { do_test_restored_packages_retry(true); } +fn do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(p2a_anchor: bool) { + let chanmon_cfgs = create_chanmon_cfgs(2); + let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let mut anchors_config = test_default_channel_config(); + anchors_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + anchors_config.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; + let node_chanmgrs = + create_node_chanmgrs(2, &node_cfgs, &[Some(anchors_config.clone()), Some(anchors_config)]); + let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + + let coinbase_tx = provide_anchor_reserves(&nodes); + let (_, _, chan_id, funding_tx) = + create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 50_000_000); + + // Seed two unresolved outbound HTLCs that will be aggregated into one + // delayed holder-commitment package after force close. + route_payment(&nodes[0], &[&nodes[1]], 10_000_000); + route_payment(&nodes[0], &[&nodes[1]], 11_000_000); + + // Add a third incoming HTLC which will later be claimed by preimage after + // the commitment transaction confirms, reproducing the replay path. + let (claim_route, claim_hash, claim_preimage, claim_secret) = + get_route_and_payment_hash!(nodes[1], nodes[0], 12_000_000); + nodes[1] + .node + .send_payment_with_route( + claim_route, + claim_hash, + RecipientOnionFields::secret_only(claim_secret, 12_000_000), + PaymentId(claim_hash.0), + ) + .unwrap(); + check_added_monitors(&nodes[1], 1); + let updates = get_htlc_update_msgs(&nodes[1], &nodes[0].node.get_our_node_id()); + nodes[0] + .node + .handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[0], &nodes[1], &updates.commitment_signed, false, false); + expect_and_process_pending_htlcs(&nodes[0], false); + expect_payment_claimable!(nodes[0], claim_hash, claim_secret, 12_000_000); + + // Force-close node 0 so its holder commitment hits chain and its HTLC + // claims are fed into OnchainTxHandler as delayed requests. + let message = "Channel force-closed".to_owned(); + nodes[0] + .node + .force_close_broadcasting_latest_txn( + &chan_id, + &nodes[1].node.get_our_node_id(), + message.clone(), + ) + .unwrap(); + check_added_monitors(&nodes[0], 1); + check_closed_broadcast(&nodes[0], 1, true); + let reason = ClosureReason::HolderForceClosed { broadcasted_latest_txn: Some(true), message }; + check_closed_event(&nodes[0], 1, reason, &[nodes[1].node.get_our_node_id()], 1_000_000); + handle_bump_close_event(&nodes[0]); + + let (commitment_tx, anchor_tx) = { + let mut txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(txn.len(), if p2a_anchor { 2 } else { 1 }); + let anchor_tx = p2a_anchor.then(|| txn.pop().unwrap()); + let commitment_tx = txn.pop().unwrap(); + check_spends!(commitment_tx, funding_tx); + if p2a_anchor { + check_spends!(anchor_tx.as_ref().unwrap(), commitment_tx, coinbase_tx); + } + (commitment_tx, anchor_tx) + }; + + let _ = mine_transaction(&nodes[0], &commitment_tx); + if p2a_anchor { + let _ = mine_transaction(&nodes[0], anchor_tx.as_ref().unwrap()); + } + + // Claim the incoming HTLC after the commitment is confirmed. This + // regenerates a single-outpoint claim request alongside the existing + // delayed package covering the two earlier HTLCs. + nodes[0].node.claim_funds(claim_preimage); + check_added_monitors(&nodes[0], 1); + expect_payment_claimed!(nodes[0], claim_hash, 12_000_000); + + // Once all holder HTLCs reach their timelock, we should see the original two-HTLC + // delayed package plus the replayed single-HTLC claim, not duplicates of + // the delayed package's outpoints. + connect_blocks(&nodes[0], TEST_FINAL_CLTV + 1); + + let mut htlc_event_sizes = nodes[0] + .chain_monitor + .chain_monitor + .get_and_clear_pending_events() + .into_iter() + .filter_map(|event| { + if let Event::BumpTransaction(BumpTransactionEvent::HTLCResolution { + htlc_descriptors, .. + }) = event + { + Some(htlc_descriptors.len()) + } else { + None + } + }) + .collect::>(); + htlc_event_sizes.sort_unstable(); + assert_eq!(htlc_event_sizes, vec![1, 2]); +} + +#[test] +fn test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay() { + do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(false); + do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(true); +} + fn do_test_monitor_rebroadcast_pending_claims(keyed_anchors: bool, p2a_anchor: bool) { // Test that we will retry broadcasting pending claims for a force-closed channel on every // `ChainMonitor::rebroadcast_pending_claims` call. From 8a41957be9be11751b5c65d946a3c59148a05a07 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 14:23:24 +0200 Subject: [PATCH 12/19] lightning: ignore claims for pending spent outpoints When a transaction spends one outpoint from a delayed package, the split outpoint is tracked as a ContentiousOutpoint until the spend reaches anti-reorg finality. Reject replayed claim requests for those pending-spent outpoints so they are not added back before the spend reaches anti-reorg finality or reorgs out. Add an OnchainTxHandler regression that replays a holder claim during that pending-spent window and verifies reorg resurrection still works. --- lightning/src/chain/onchaintx.rs | 140 ++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 3 deletions(-) diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index 2cdad08de1e..1a5ec61b0c7 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -576,6 +576,16 @@ impl OnchainTxHandler { self.pending_claim_requests.len() != 0 } + fn is_outpoint_spend_waiting_threshold_conf(&self, outpoint: &BitcoinOutPoint) -> bool { + self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { + if let OnchainEvent::ContentiousOutpoint { ref package } = entry.event { + package.contains_outpoint(outpoint) + } else { + false + } + }) + } + /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty /// onchain) lays on the assumption of claim transactions getting confirmed before timelock /// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck @@ -802,7 +812,15 @@ impl OnchainTxHandler { // First drop any duplicate claims. requests.retain(|req| { let outpoint = req.outpoint(); - if self.claimable_outpoints.get(outpoint).is_some() { + if self.is_outpoint_spend_waiting_threshold_conf(outpoint) { + // This is a package-layer guard. ChannelMonitor filters regenerated + // HTLC claims using HTLC resolution state, while this keeps outpoints + // split from an existing package from being re-added during the reorg + // window. + log_info!(logger, "Ignoring claim for outpoint {}:{}, it is already spent by a transaction awaiting anti-reorg finality", + outpoint.txid, outpoint.vout); + false + } else if self.claimable_outpoints.get(outpoint).is_some() { log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", outpoint.txid, outpoint.vout); false @@ -1276,11 +1294,14 @@ impl OnchainTxHandler { #[cfg(test)] mod tests { - use bitcoin::hash_types::Txid; + use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; + use bitcoin::locktime::absolute::LockTime; + use bitcoin::transaction::{OutPoint as BitcoinOutPoint, Version}; use bitcoin::Network; - use bitcoin::{key::Secp256k1, secp256k1::PublicKey, secp256k1::SecretKey, ScriptBuf}; + use bitcoin::{key::Secp256k1, secp256k1::PublicKey, secp256k1::SecretKey}; + use bitcoin::{Amount, ScriptBuf, Transaction, TxIn, TxOut}; use types::features::ChannelTypeFeatures; use crate::chain::chaininterface::{ConfirmationTarget, LowerBoundedFeeEstimator}; @@ -1404,6 +1425,18 @@ mod tests { requests } + fn locked_outpoints( + tx_handler: &OnchainTxHandler, locktime: u32, + ) -> Vec { + tx_handler + .locktimed_packages + .get(&locktime) + .into_iter() + .flat_map(|packages| packages.iter()) + .flat_map(|package| package.outpoints().into_iter().map(|outpoint| *outpoint)) + .collect() + } + // Test that all claims with locktime equal to or less than the current height are broadcast // immediately while claims with locktime greater than the current height are only broadcast // once the locktime is reached. @@ -1569,4 +1602,105 @@ mod tests { _ => panic!("expected a single HTLC bump event"), } } + + #[test] + fn test_replayed_claim_ignored_for_pending_spent_outpoint() { + let claim_height = 21; + let spend_height = 22; + let locktime = 42; + let mut nondust_htlcs = Vec::new(); + for i in 0..2 { + let preimage = PaymentPreimage([i + 1; 32]); + let hash = PaymentHash(Sha256::hash(&preimage.0[..]).to_byte_array()); + nondust_htlcs.push(HTLCOutputInCommitment { + offered: true, + amount_msat: 10000, + cltv_expiry: locktime, + payment_hash: hash, + transaction_output_index: Some(i as u32), + }); + } + + let mut tx_handler = new_test_tx_handler( + ChannelTypeFeatures::anchors_zero_htlc_fee_and_dependencies(), + nondust_htlcs, + ); + let requests = build_offered_holder_htlc_requests(&tx_handler); + let spent_outpoint = *requests[0].outpoint(); + let still_delayed_outpoint = *requests[1].outpoint(); + let destination_script = ScriptBuf::new(); + let broadcaster = TestBroadcaster::new(Network::Testnet); + let fee_estimator = TestFeeEstimator::new(253); + let fee_estimator = LowerBoundedFeeEstimator::new(&fee_estimator); + let logger = TestLogger::new(); + + // Register both holder HTLC claims as one delayed package before any + // individual outpoint spends are observed. + tx_handler.update_claims_view_from_requests( + requests.clone(), + claim_height, + claim_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + assert_eq!(locked_outpoints(&tx_handler, locktime).len(), 2); + + // Spend one outpoint before the package reaches its timelock. The handler + // should split it into a ContentiousOutpoint until the spend reaches + // anti-reorg finality. + let spend_tx = Transaction { + version: Version::TWO, + lock_time: LockTime::ZERO, + input: vec![TxIn { previous_output: spent_outpoint, ..Default::default() }], + output: vec![TxOut { value: Amount::from_sat(1000), script_pubkey: ScriptBuf::new() }], + }; + tx_handler.update_claims_view_from_matched_txn( + &[&spend_tx], + spend_height, + BlockHash::all_zeros(), + spend_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked, vec![still_delayed_outpoint]); + + // Replaying both original claim requests during that window must not + // re-add the already-spent outpoint to the delayed package. + tx_handler.update_claims_view_from_requests( + requests, + spend_height, + spend_height, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked, vec![still_delayed_outpoint]); + assert!(tx_handler.pending_claim_requests.is_empty()); + assert!(tx_handler.claimable_outpoints.is_empty()); + + // If the spend reorgs out, the contentious outpoint is resurrected into + // the delayed package. + tx_handler.blocks_disconnected( + spend_height - 1, + &&broadcaster, + ConfirmationTarget::UrgentOnChainSweep, + &destination_script, + &fee_estimator, + &logger, + ); + let locked = locked_outpoints(&tx_handler, locktime); + assert_eq!(locked.len(), 2); + assert!(locked.contains(&spent_outpoint)); + assert!(locked.contains(&still_delayed_outpoint)); + } } From 8e18e7b91d5744006d7d6670fdaa0509953fb0de Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 6 May 2026 17:57:23 +0200 Subject: [PATCH 13/19] f: fold timelocked outpoint claim check Classify duplicate outpoint state in one helper. Preserve existing filter ordering and timelock logging. --- lightning/src/chain/onchaintx.rs | 73 +++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/lightning/src/chain/onchaintx.rs b/lightning/src/chain/onchaintx.rs index 1a5ec61b0c7..e4d23d77e88 100644 --- a/lightning/src/chain/onchaintx.rs +++ b/lightning/src/chain/onchaintx.rs @@ -91,6 +91,12 @@ enum OnchainEvent { ContentiousOutpoint { package: PackageTemplate }, } +enum OutpointClaimState { + WaitingThresholdConf, + ClaimingRequestRegistered, + WaitingTimelock(u32), +} + impl Writeable for OnchainEventEntry { fn write(&self, writer: &mut W) -> Result<(), io::Error> { write_tlv_fields!(writer, { @@ -576,14 +582,30 @@ impl OnchainTxHandler { self.pending_claim_requests.len() != 0 } - fn is_outpoint_spend_waiting_threshold_conf(&self, outpoint: &BitcoinOutPoint) -> bool { - self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { + fn outpoint_claim_state( + &self, outpoint: &BitcoinOutPoint, cur_height: u32, + ) -> Option { + if self.onchain_events_awaiting_threshold_conf.iter().any(|entry| { if let OnchainEvent::ContentiousOutpoint { ref package } = entry.event { package.contains_outpoint(outpoint) } else { false } - }) + }) { + return Some(OutpointClaimState::WaitingThresholdConf); + } + + if self.claimable_outpoints.get(outpoint).is_some() { + return Some(OutpointClaimState::ClaimingRequestRegistered); + } + + self.locktimed_packages + .values() + .flat_map(|packages| packages.iter()) + .find(|locked_package| locked_package.contains_outpoint(outpoint)) + .map(|package| { + OutpointClaimState::WaitingTimelock(package.package_locktime(cur_height)) + }) } /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty @@ -812,29 +834,30 @@ impl OnchainTxHandler { // First drop any duplicate claims. requests.retain(|req| { let outpoint = req.outpoint(); - if self.is_outpoint_spend_waiting_threshold_conf(outpoint) { - // This is a package-layer guard. ChannelMonitor filters regenerated - // HTLC claims using HTLC resolution state, while this keeps outpoints - // split from an existing package from being re-added during the reorg - // window. - log_info!(logger, "Ignoring claim for outpoint {}:{}, it is already spent by a transaction awaiting anti-reorg finality", - outpoint.txid, outpoint.vout); - false - } else if self.claimable_outpoints.get(outpoint).is_some() { - log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", - outpoint.txid, outpoint.vout); - false - } else { - let timelocked_covering_package = self.locktimed_packages.values() - .flat_map(|packages| packages.iter()) - .find(|locked_package| locked_package.contains_outpoint(outpoint)); - if let Some(package) = timelocked_covering_package { - log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", - outpoint.txid, outpoint.vout, package.package_locktime(cur_height)); - false - } else { - true + if let Some(claim_state) = self.outpoint_claim_state(outpoint, cur_height) { + match claim_state { + OutpointClaimState::WaitingThresholdConf => { + // This is a package-layer guard. ChannelMonitor filters regenerated + // HTLC claims using HTLC resolution state, while this keeps outpoints + // split from an existing package from being re-added during the reorg + // window. + log_info!(logger, "Ignoring claim for outpoint {}:{}, it is already spent by a transaction awaiting anti-reorg finality", + outpoint.txid, outpoint.vout); + false + }, + OutpointClaimState::ClaimingRequestRegistered => { + log_info!(logger, "Ignoring second claim for outpoint {}:{}, already registered its claiming request", + outpoint.txid, outpoint.vout); + false + }, + OutpointClaimState::WaitingTimelock(locktime) => { + log_info!(logger, "Ignoring second claim for outpoint {}:{}, we already have one which we're waiting on a timelock at {} for.", + outpoint.txid, outpoint.vout, locktime); + false + }, } + } else { + true } }); let mut requests = requests.into_iter() From 8f1e37bdc3d414f6098eb1a612970a6acaa54016 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 14:29:33 +0200 Subject: [PATCH 14/19] lightning: skip resolved HTLC claim replays Filter regenerated HTLC claim requests once ChannelMonitor has persisted anti-reorg finality for the commitment HTLC output spend. This keeps replayed preimage updates from recreating claims after OnchainTxHandler has cleaned up its active retry state, relying on the monitor's persisted HTLC resolution state. --- lightning/src/chain/channelmonitor.rs | 25 +++++++- lightning/src/ln/monitor_tests.rs | 84 ++++++++++++++++++++++++++- 2 files changed, 106 insertions(+), 3 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 09aa863b51c..e0c56b4d537 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -4985,7 +4985,10 @@ impl ChannelMonitorImpl { .iter() .filter_map(|(htlc, _)| { if let Some(transaction_output_index) = htlc.transaction_output_index { - if htlc.offered && htlc.payment_hash == matching_payment_hash { + if htlc.offered + && htlc.payment_hash == matching_payment_hash + && !self.is_htlc_output_resolved_on_chain(htlc) + { let htlc_data = PackageSolvingData::CounterpartyOfferedHTLCOutput( CounterpartyOfferedHTLCOutput::build( per_commitment_point, @@ -5011,6 +5014,20 @@ impl ChannelMonitorImpl { .collect() } + fn is_htlc_output_resolved_on_chain(&self, htlc: &HTLCOutputInCommitment) -> bool { + if let Some(transaction_output_index) = htlc.transaction_output_index { + // Only suppress claims once the commitment HTLC output spend has + // reached anti-reorg finality. Any output created by that spend may + // still be CSV-delayed, but the original HTLC outpoint should not be + // re-claimed. + self.htlcs_resolved_on_chain.iter().any(|resolved_htlc| { + resolved_htlc.commitment_tx_output_idx == Some(transaction_output_index) + }) + } else { + false + } + } + /// Returns the HTLC claim requests and the counterparty output info. fn get_counterparty_output_claim_info( &self, funding_spent: &FundingScope, commitment_number: u64, commitment_txid: Txid, @@ -5058,6 +5075,9 @@ impl ChannelMonitorImpl { // per_commitment_data is corrupt or our commitment signing key leaked! return (claimable_outpoints, to_counterparty_output_info); } + if self.is_htlc_output_resolved_on_chain(htlc) { + continue; + } let preimage = if htlc.offered { if let Some((p, _)) = self.payment_preimages.get(&htlc.payment_hash) { Some(*p) @@ -5159,6 +5179,9 @@ impl ChannelMonitorImpl { let mut htlcs = Vec::with_capacity(holder_tx.nondust_htlcs().len()); debug_assert_eq!(holder_tx.nondust_htlcs().len(), holder_tx.counterparty_htlc_sigs.len()); for (htlc, counterparty_sig) in holder_tx.nondust_htlcs().iter().zip(holder_tx.counterparty_htlc_sigs.iter()) { + if self.is_htlc_output_resolved_on_chain(htlc) { + continue; + } assert!(htlc.transaction_output_index.is_some(), "Expected transaction output index for non-dust HTLC"); let preimage = if htlc.offered { diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index 436bb01c907..111d1fbfd81 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -2458,12 +2458,15 @@ fn test_restored_packages_retry() { fn do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(p2a_anchor: bool) { let chanmon_cfgs = create_chanmon_cfgs(2); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_deserialized; let mut anchors_config = test_default_channel_config(); anchors_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; anchors_config.channel_handshake_config.negotiate_anchor_zero_fee_commitments = p2a_anchor; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(anchors_config.clone()), Some(anchors_config)]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); let coinbase_tx = provide_anchor_reserves(&nodes); let (_, _, chan_id, funding_tx) = @@ -2542,11 +2545,14 @@ fn do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(p2a_anc // the delayed package's outpoints. connect_blocks(&nodes[0], TEST_FINAL_CLTV + 1); - let mut htlc_event_sizes = nodes[0] + let events = nodes[0] .chain_monitor .chain_monitor .get_and_clear_pending_events() .into_iter() + .collect::>(); + let mut htlc_event_sizes = events + .iter() .filter_map(|event| { if let Event::BumpTransaction(BumpTransactionEvent::HTLCResolution { htlc_descriptors, .. @@ -2560,6 +2566,80 @@ fn do_test_duplicate_delayed_holder_htlc_claims_after_claim_funds_replay(p2a_anc .collect::>(); htlc_event_sizes.sort_unstable(); assert_eq!(htlc_event_sizes, vec![1, 2]); + + // Drive only the replayed single-HTLC event on-chain so we can replay the + // preimage once the spend is anti-reorg final, then again after reload. + for event in events { + if let Event::BumpTransaction(event) = event { + let is_single_htlc = if let BumpTransactionEvent::HTLCResolution { + ref htlc_descriptors, + .. + } = event + { + htlc_descriptors.len() == 1 + } else { + false + }; + if is_single_htlc { + nodes[0].bump_tx_handler.handle_event(&event); + break; + } + } + } + let mut htlc_txn = nodes[0].tx_broadcaster.unique_txn_broadcast(); + assert_eq!(htlc_txn.len(), 1); + let htlc_tx = htlc_txn.pop().unwrap(); + mine_transaction(&nodes[0], &htlc_tx); + connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + + // The spend has passed anti-reorg finality, but its CSV-delayed output is + // not yet spendable. Replaying the preimage in this window must not create + // a new conflicting claim for the already-spent commitment HTLC output. + get_monitor!(nodes[0], chan_id).provide_payment_preimage_unsafe_legacy( + &claim_hash, + &claim_preimage, + &node_cfgs[0].tx_broadcaster, + &LowerBoundedFeeEstimator::new(node_cfgs[0].fee_estimator), + &nodes[0].logger, + ); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + let balances = nodes[0] + .chain_monitor + .chain_monitor + .get_monitor(chan_id) + .unwrap() + .get_claimable_balances(); + assert!(balances.iter().any(|balance| matches!( + balance, + Balance::ClaimableAwaitingConfirmations { + amount_satoshis: 12_000, + source: BalanceSource::Htlc, + .. + } + ))); + + connect_blocks(&nodes[0], BREAKDOWN_TIMEOUT as u32 - ANTI_REORG_DELAY); + let _ = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); + + // Reload before replaying the preimage so the regression covers persisted + // resolution state, not only in-memory filtering. + let serialized_channel_manager = nodes[0].node.encode(); + let serialized_monitor = get_monitor!(nodes[0], chan_id).encode(); + reload_node!( + nodes[0], &serialized_channel_manager, &[&serialized_monitor], persister, + new_chain_monitor, node_deserialized + ); + + // Replaying the preimage update must not regenerate a claim for the HTLC + // whose commitment output has anti-reorg persisted resolution state. + get_monitor!(nodes[0], chan_id).provide_payment_preimage_unsafe_legacy( + &claim_hash, &claim_preimage, &node_cfgs[0].tx_broadcaster, + &LowerBoundedFeeEstimator::new(node_cfgs[0].fee_estimator), &nodes[0].logger, + ); + assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty()); + expect_payment_claimed!(nodes[0], claim_hash, 12_000_000); + check_added_monitors(&nodes[0], 1); } #[test] From c30eafe2044f317e787eeee3ef2789c4cdf68319 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Wed, 6 May 2026 17:20:10 +0200 Subject: [PATCH 15/19] f: log resolved HTLC preimage losses Log when a replayed preimage claim is skipped because the HTLC output reached anti-reorg finality without that preimage. --- lightning/src/chain/channelmonitor.rs | 78 ++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index e0c56b4d537..7ee9df1c30f 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -3813,7 +3813,7 @@ impl ChannelMonitorImpl { // First check if a counterparty commitment transaction has been broadcasted: macro_rules! claim_htlcs { ($commitment_number: expr, $txid: expr, $htlcs: expr) => { - let htlc_claim_reqs = self.get_counterparty_output_claims_for_preimage(*payment_preimage, funding_spent, $commitment_number, $txid, $htlcs, confirmed_spend_height); + let htlc_claim_reqs = self.get_counterparty_output_claims_for_preimage(*payment_preimage, funding_spent, $commitment_number, $txid, $htlcs, confirmed_spend_height, logger); let conf_target = self.closure_conf_target(); self.onchain_tx_handler.update_claims_view_from_requests( htlc_claim_reqs, self.best_block.height, self.best_block.height, broadcaster, @@ -3862,6 +3862,9 @@ impl ChannelMonitorImpl { None }; if let Some(holder_commitment_tx) = holder_commitment_tx { + self.log_holder_preimage_claim_after_htlc_resolved_on_chain( + logger, holder_commitment_tx, *payment_preimage, + ); // Assume that the broadcasted commitment transaction confirmed in the current best // block. Even if not, its a reasonable metric for the bump criteria on the HTLC // transactions. @@ -4965,11 +4968,11 @@ impl ChannelMonitorImpl { } } - fn get_counterparty_output_claims_for_preimage( + fn get_counterparty_output_claims_for_preimage( &self, preimage: PaymentPreimage, funding_spent: &FundingScope, commitment_number: u64, commitment_txid: Txid, per_commitment_option: Option<&Vec<(HTLCOutputInCommitment, Option>)>>, - confirmation_height: Option, + confirmation_height: Option, logger: &L, ) -> Vec { let per_commitment_claimable_data = match per_commitment_option { Some(outputs) => outputs, @@ -4985,10 +4988,17 @@ impl ChannelMonitorImpl { .iter() .filter_map(|(htlc, _)| { if let Some(transaction_output_index) = htlc.transaction_output_index { - if htlc.offered - && htlc.payment_hash == matching_payment_hash - && !self.is_htlc_output_resolved_on_chain(htlc) - { + if htlc.offered && htlc.payment_hash == matching_payment_hash { + if let Some(resolved_htlc) = self.htlc_output_resolution_on_chain(htlc) { + self.log_preimage_claim_after_htlc_resolved_on_chain( + logger, + commitment_txid, + htlc, + preimage, + resolved_htlc, + ); + return None; + } let htlc_data = PackageSolvingData::CounterpartyOfferedHTLCOutput( CounterpartyOfferedHTLCOutput::build( per_commitment_point, @@ -5014,17 +5024,57 @@ impl ChannelMonitorImpl { .collect() } - fn is_htlc_output_resolved_on_chain(&self, htlc: &HTLCOutputInCommitment) -> bool { - if let Some(transaction_output_index) = htlc.transaction_output_index { + fn htlc_output_resolution_on_chain( + &self, htlc: &HTLCOutputInCommitment, + ) -> Option<&IrrevocablyResolvedHTLC> { + htlc.transaction_output_index.and_then(|transaction_output_index| { // Only suppress claims once the commitment HTLC output spend has // reached anti-reorg finality. Any output created by that spend may // still be CSV-delayed, but the original HTLC outpoint should not be // re-claimed. - self.htlcs_resolved_on_chain.iter().any(|resolved_htlc| { + self.htlcs_resolved_on_chain.iter().find(|resolved_htlc| { resolved_htlc.commitment_tx_output_idx == Some(transaction_output_index) }) - } else { - false + }) + } + + fn log_preimage_claim_after_htlc_resolved_on_chain( + &self, logger: &L, commitment_txid: Txid, htlc: &HTLCOutputInCommitment, + preimage: PaymentPreimage, resolved_htlc: &IrrevocablyResolvedHTLC, + ) { + if resolved_htlc.payment_preimage == Some(preimage) { + return; + } + if let Some(transaction_output_index) = htlc.transaction_output_index { + let logger = WithContext::from(logger, None, None, Some(htlc.payment_hash)); + if let Some(resolving_txid) = resolved_htlc.resolving_txid.as_ref() { + log_error!(logger, "WE HAVE LIKELY LOST FUNDS: HTLC output {}:{} was irrevocably resolved on-chain by transaction {} without the payment preimage we now know; not replaying the claim", + commitment_txid, transaction_output_index, resolving_txid); + } else { + log_error!(logger, "WE HAVE LIKELY LOST FUNDS: HTLC output {}:{} was irrevocably resolved on-chain by an unknown transaction without the payment preimage we now know; not replaying the claim", + commitment_txid, transaction_output_index); + } + } + } + + fn log_holder_preimage_claim_after_htlc_resolved_on_chain( + &self, logger: &L, holder_tx: &HolderCommitmentTransaction, preimage: PaymentPreimage, + ) { + let matching_payment_hash = PaymentHash::from(preimage); + let tx = holder_tx.trust(); + for htlc in holder_tx.nondust_htlcs() { + if htlc.offered || htlc.payment_hash != matching_payment_hash { + continue; + } + if let Some(resolved_htlc) = self.htlc_output_resolution_on_chain(htlc) { + self.log_preimage_claim_after_htlc_resolved_on_chain( + logger, + tx.txid(), + htlc, + preimage, + resolved_htlc, + ); + } } } @@ -5075,7 +5125,7 @@ impl ChannelMonitorImpl { // per_commitment_data is corrupt or our commitment signing key leaked! return (claimable_outpoints, to_counterparty_output_info); } - if self.is_htlc_output_resolved_on_chain(htlc) { + if self.htlc_output_resolution_on_chain(htlc).is_some() { continue; } let preimage = if htlc.offered { @@ -5179,7 +5229,7 @@ impl ChannelMonitorImpl { let mut htlcs = Vec::with_capacity(holder_tx.nondust_htlcs().len()); debug_assert_eq!(holder_tx.nondust_htlcs().len(), holder_tx.counterparty_htlc_sigs.len()); for (htlc, counterparty_sig) in holder_tx.nondust_htlcs().iter().zip(holder_tx.counterparty_htlc_sigs.iter()) { - if self.is_htlc_output_resolved_on_chain(htlc) { + if self.htlc_output_resolution_on_chain(htlc).is_some() { continue; } assert!(htlc.transaction_output_index.is_some(), "Expected transaction output index for non-dust HTLC"); From 119aaa2a6085916ed221517871a0541f208d8160 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 30 Apr 2026 14:33:27 +0200 Subject: [PATCH 16/19] lightning: canonicalize htlc claim ids Hash HTLC claim outpoints in canonical order so the same logical HTLC set produces the same ClaimId regardless of descriptor order. Add a unit test covering reversed descriptor order. --- lightning/src/chain/mod.rs | 64 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 3 deletions(-) diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index d72d58b3149..5ff96e46953 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -563,10 +563,18 @@ pub struct ClaimId(pub [u8; 32]); impl ClaimId { pub(crate) fn from_htlcs(htlcs: &[HTLCDescriptor]) -> ClaimId { + let mut htlc_outpoints = htlcs + .iter() + .map(|htlc| { + (htlc.commitment_txid.to_byte_array(), htlc.htlc.transaction_output_index.unwrap()) + }) + .collect::>(); + htlc_outpoints.sort_unstable(); + let mut engine = Sha256::engine(); - for htlc in htlcs { - engine.input(&htlc.commitment_txid.to_byte_array()); - engine.input(&htlc.htlc.transaction_output_index.unwrap().to_be_bytes()); + for (commitment_txid, transaction_output_index) in htlc_outpoints { + engine.input(&commitment_txid); + engine.input(&transaction_output_index.to_be_bytes()); } ClaimId(Sha256::from_engine(engine).to_byte_array()) } @@ -581,8 +589,45 @@ impl ClaimId { #[cfg(test)] mod tests { use super::*; + use crate::ln::chan_utils::{ + ChannelTransactionParameters, HTLCOutputInCommitment, HolderCommitmentTransaction, + }; + use crate::sign::ChannelDerivationParameters; + use crate::types::payment::{PaymentHash, PaymentPreimage}; use bitcoin::hashes::Hash; + fn dummy_htlc_descriptor( + commitment_txid: Txid, transaction_output_index: u32, + ) -> HTLCDescriptor { + let channel_parameters = ChannelTransactionParameters::test_dummy(100_000); + let htlc = HTLCOutputInCommitment { + offered: true, + amount_msat: 1000, + cltv_expiry: 100, + payment_hash: PaymentHash::from(PaymentPreimage([1; 32])), + transaction_output_index: Some(transaction_output_index), + }; + let funding_outpoint = channel_parameters.funding_outpoint.unwrap(); + let commitment_tx = + HolderCommitmentTransaction::dummy(100_000, funding_outpoint, vec![htlc.clone()]); + let trusted_tx = commitment_tx.trust(); + + HTLCDescriptor { + channel_derivation_parameters: ChannelDerivationParameters { + value_satoshis: channel_parameters.channel_value_satoshis, + keys_id: [1; 32], + transaction_parameters: channel_parameters, + }, + commitment_txid, + per_commitment_number: trusted_tx.commitment_number(), + per_commitment_point: trusted_tx.per_commitment_point(), + feerate_per_kw: trusted_tx.negotiated_feerate_per_kw(), + htlc, + preimage: None, + counterparty_sig: commitment_tx.counterparty_htlc_sigs[0], + } + } + #[test] fn test_best_block() { let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); @@ -618,4 +663,17 @@ mod tests { let chain_c = BlockLocator::new(hash_other, 200); assert_eq!(chain_a.find_common_ancestor(&chain_c), None); } + + #[test] + fn test_htlc_claim_id_is_descriptor_order_independent() { + // Use opposite txid and vout ordering so the assertion would fail if + // ClaimId still hashed descriptors in caller-provided order. + let first = dummy_htlc_descriptor(Txid::from_slice(&[1; 32]).unwrap(), 2); + let second = dummy_htlc_descriptor(Txid::from_slice(&[2; 32]).unwrap(), 1); + + assert_eq!( + ClaimId::from_htlcs(&[first.clone(), second.clone()]), + ClaimId::from_htlcs(&[second, first]) + ); + } } From a7373fc13687726a70f00f809db47799cd73bf2f Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 18 May 2026 15:35:02 +0200 Subject: [PATCH 17/19] fuzz: route chanmon messages by destination Queue delayed peer messages by their source and destination link. --- fuzz/src/chanmon_consistency.rs | 209 ++++++++++++-------------------- 1 file changed, 77 insertions(+), 132 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index dfaa7d97387..42e202965e7 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -1207,6 +1207,42 @@ struct EventQueues { cb: Vec, } +fn find_destination_node(nodes: &[HarnessNode<'_>; 3], node_id: &PublicKey) -> usize { + nodes + .iter() + .position(|node| node.get_our_node_id() == *node_id) + .expect("message destination should be a known harness node") +} + +fn directed_msg_destination(event: &MessageSendEvent) -> Option { + match event { + MessageSendEvent::UpdateHTLCs { ref node_id, .. } + | MessageSendEvent::SendRevokeAndACK { ref node_id, .. } + | MessageSendEvent::SendChannelReestablish { ref node_id, .. } + | MessageSendEvent::SendStfu { ref node_id, .. } + | MessageSendEvent::SendSpliceInit { ref node_id, .. } + | MessageSendEvent::SendSpliceAck { ref node_id, .. } + | MessageSendEvent::SendSpliceLocked { ref node_id, .. } + | MessageSendEvent::SendTxAddInput { ref node_id, .. } + | MessageSendEvent::SendTxAddOutput { ref node_id, .. } + | MessageSendEvent::SendTxRemoveInput { ref node_id, .. } + | MessageSendEvent::SendTxRemoveOutput { ref node_id, .. } + | MessageSendEvent::SendTxComplete { ref node_id, .. } + | MessageSendEvent::SendTxAbort { ref node_id, .. } + | MessageSendEvent::SendTxInitRbf { ref node_id, .. } + | MessageSendEvent::SendTxAckRbf { ref node_id, .. } + | MessageSendEvent::SendTxSignatures { ref node_id, .. } + | MessageSendEvent::SendChannelReady { ref node_id, .. } + | MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } + | MessageSendEvent::SendChannelUpdate { ref node_id, .. } + | MessageSendEvent::HandleError { ref node_id, .. } => Some(*node_id), + MessageSendEvent::BroadcastChannelUpdate { .. } + | MessageSendEvent::BroadcastChannelAnnouncement { .. } + | MessageSendEvent::BroadcastNodeAnnouncement { .. } => None, + _ => panic!("Unhandled message event {:?}", event), + } +} + impl EventQueues { fn new() -> Self { Self { ab: Vec::new(), ba: Vec::new(), bc: Vec::new(), cb: Vec::new() } @@ -1235,78 +1271,43 @@ impl EventQueues { } } - fn push_for_node(&mut self, node_idx: usize, event: MessageSendEvent) { - match node_idx { - 0 => self.ab.push(event), - 2 => self.cb.push(event), - _ => panic!("cannot directly queue messages for node {}", node_idx), - } - } - - fn extend_for_node>( - &mut self, node_idx: usize, events: I, + fn route_from_node<'a, I: IntoIterator>( + &mut self, source_idx: usize, events: I, expect_drop_node: Option, + nodes: &[HarnessNode<'a>; 3], ) { - match node_idx { - 0 => self.ab.extend(events), - 2 => self.cb.extend(events), - _ => panic!("cannot directly queue messages for node {}", node_idx), - } - } - - fn route_from_middle<'a, I: IntoIterator>( - &mut self, excess_events: I, expect_drop_node: Option, nodes: &[HarnessNode<'a>; 3], - ) { - // Push any events from Node B onto queues.ba and queues.bc. - let a_id = nodes[0].get_our_node_id(); let expect_drop_id = expect_drop_node.map(|id| nodes[id].get_our_node_id()); - for event in excess_events { - let push_a = match event { - MessageSendEvent::UpdateHTLCs { ref node_id, .. } - | MessageSendEvent::SendRevokeAndACK { ref node_id, .. } - | MessageSendEvent::SendChannelReestablish { ref node_id, .. } - | MessageSendEvent::SendStfu { ref node_id, .. } - | MessageSendEvent::SendSpliceInit { ref node_id, .. } - | MessageSendEvent::SendSpliceAck { ref node_id, .. } - | MessageSendEvent::SendSpliceLocked { ref node_id, .. } - | MessageSendEvent::SendTxAddInput { ref node_id, .. } - | MessageSendEvent::SendTxAddOutput { ref node_id, .. } - | MessageSendEvent::SendTxRemoveInput { ref node_id, .. } - | MessageSendEvent::SendTxRemoveOutput { ref node_id, .. } - | MessageSendEvent::SendTxComplete { ref node_id, .. } - | MessageSendEvent::SendTxAbort { ref node_id, .. } - | MessageSendEvent::SendTxInitRbf { ref node_id, .. } - | MessageSendEvent::SendTxAckRbf { ref node_id, .. } - | MessageSendEvent::SendTxSignatures { ref node_id, .. } - | MessageSendEvent::SendChannelUpdate { ref node_id, .. } => { - if Some(*node_id) == expect_drop_id { - panic!( - "peer_disconnected should drop msgs bound for the disconnected peer" - ); - } - *node_id == a_id - }, - MessageSendEvent::HandleError { ref action, ref node_id } => { - assert_action_timeout_awaiting_response(action); - if Some(*node_id) == expect_drop_id { - panic!( - "peer_disconnected should drop msgs bound for the disconnected peer" - ); - } - *node_id == a_id - }, - MessageSendEvent::SendChannelReady { .. } - | MessageSendEvent::SendAnnouncementSignatures { .. } - | MessageSendEvent::BroadcastChannelUpdate { .. } => continue, - _ => panic!("Unhandled message event {:?}", event), + for event in events { + if let MessageSendEvent::HandleError { ref action, .. } = event { + assert_action_timeout_awaiting_response(action); + } + let Some(node_id) = directed_msg_destination(&event) else { + continue; }; - if push_a { - self.ba.push(event); - } else { - self.bc.push(event); + if Some(node_id) == expect_drop_id { + panic!("peer_disconnected should drop msgs bound for the disconnected peer"); + } + let dest_idx = find_destination_node(nodes, &node_id); + match (source_idx, dest_idx) { + (0, 1) => self.ab.push(event), + (1, 0) => self.ba.push(event), + (1, 2) => self.bc.push(event), + (2, 1) => self.cb.push(event), + _ => panic!("unsupported message route {} -> {}", source_idx, dest_idx), } } } + fn route_pending_from_node( + &mut self, source_idx: usize, expect_drop_node: Option, nodes: &[HarnessNode<'_>; 3], + ) { + self.route_from_node( + source_idx, + nodes[source_idx].get_and_clear_pending_msg_events(), + expect_drop_node, + nodes, + ); + } + fn clear_link(&mut self, link: &PeerLink) { match (link.node_a, link.node_b) { (0, 1) | (1, 0) => { @@ -1324,42 +1325,12 @@ impl EventQueues { fn drain_on_disconnect(&mut self, edge_node: usize, nodes: &[HarnessNode<'_>; 3]) { match edge_node { 0 => { - for event in nodes[0].get_and_clear_pending_msg_events() { - match event { - MessageSendEvent::UpdateHTLCs { .. } => {}, - MessageSendEvent::SendRevokeAndACK { .. } => {}, - MessageSendEvent::SendChannelReestablish { .. } => {}, - MessageSendEvent::SendStfu { .. } => {}, - MessageSendEvent::SendChannelReady { .. } => {}, - MessageSendEvent::SendAnnouncementSignatures { .. } => {}, - MessageSendEvent::BroadcastChannelUpdate { .. } => {}, - MessageSendEvent::SendChannelUpdate { .. } => {}, - MessageSendEvent::HandleError { ref action, .. } => { - assert_action_timeout_awaiting_response(action); - }, - _ => panic!("Unhandled message event"), - } - } - self.route_from_middle(nodes[1].get_and_clear_pending_msg_events(), Some(0), nodes); + self.route_pending_from_node(0, Some(1), nodes); + self.route_pending_from_node(1, Some(0), nodes); }, 2 => { - for event in nodes[2].get_and_clear_pending_msg_events() { - match event { - MessageSendEvent::UpdateHTLCs { .. } => {}, - MessageSendEvent::SendRevokeAndACK { .. } => {}, - MessageSendEvent::SendChannelReestablish { .. } => {}, - MessageSendEvent::SendStfu { .. } => {}, - MessageSendEvent::SendChannelReady { .. } => {}, - MessageSendEvent::SendAnnouncementSignatures { .. } => {}, - MessageSendEvent::BroadcastChannelUpdate { .. } => {}, - MessageSendEvent::SendChannelUpdate { .. } => {}, - MessageSendEvent::HandleError { ref action, .. } => { - assert_action_timeout_awaiting_response(action); - }, - _ => panic!("Unhandled message event"), - } - } - self.route_from_middle(nodes[1].get_and_clear_pending_msg_events(), Some(2), nodes); + self.route_pending_from_node(2, Some(1), nodes); + self.route_pending_from_node(1, Some(2), nodes); }, _ => panic!("unsupported disconnected edge"), } @@ -1429,7 +1400,7 @@ impl PeerLink { queues.clear_link(self); } - fn reconnect(&mut self, nodes: &[HarnessNode<'_>; 3]) { + fn reconnect(&mut self, nodes: &[HarnessNode<'_>; 3], queues: &mut EventQueues) { if !self.disconnected { return; } @@ -1448,6 +1419,8 @@ impl PeerLink { }; nodes[self.node_b].peer_connected(node_a_id, &init_a, false).unwrap(); self.disconnected = false; + queues.route_pending_from_node(self.node_a, None, nodes); + queues.route_pending_from_node(self.node_b, None, nodes); } fn disconnect_for_reload( @@ -1463,15 +1436,7 @@ impl PeerLink { nodes[remaining_node].peer_disconnected(restarted_node_id); self.disconnected = true; - if remaining_node == 1 { - queues.route_from_middle( - nodes[1].get_and_clear_pending_msg_events(), - Some(restarted_node), - nodes, - ); - } else { - nodes[remaining_node].get_and_clear_pending_msg_events(); - } + queues.route_pending_from_node(remaining_node, Some(restarted_node), nodes); queues.clear_link(self); } } @@ -2443,13 +2408,6 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { fn process_msg_events( &mut self, node_idx: usize, corrupt_forward: bool, limit_events: ProcessMessages, ) -> bool { - fn find_destination_node(nodes: &[HarnessNode<'_>; 3], node_id: &PublicKey) -> usize { - nodes - .iter() - .position(|node| node.get_our_node_id() == *node_id) - .expect("message destination should be a known harness node") - } - fn log_msg_delivery( node_idx: usize, dest_idx: usize, msg_name: &str, out: &Out, ) { @@ -2690,20 +2648,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { break; } } - if node_idx == 1 { - let remaining = extra_ev.into_iter().chain(events_iter).collect::>(); - queues.route_from_middle(remaining, None, nodes); - } else if node_idx == 0 { - if let Some(ev) = extra_ev { - queues.push_for_node(0, ev); - } - queues.extend_for_node(0, events_iter); - } else { - if let Some(ev) = extra_ev { - queues.push_for_node(2, ev); - } - queues.extend_for_node(2, events_iter); - } + queues.route_from_node(node_idx, extra_ev.into_iter().chain(events_iter), None, nodes); had_events } @@ -2851,11 +2796,11 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } fn reconnect_ab(&mut self) { - self.ab_link.reconnect(&self.nodes); + self.ab_link.reconnect(&self.nodes, &mut self.queues); } fn reconnect_bc(&mut self) { - self.bc_link.reconnect(&self.nodes); + self.bc_link.reconnect(&self.nodes, &mut self.queues); } fn restart_node(&mut self, node_idx: usize, v: u8, router: &'a FuzzRouter) { From 1d4a8ec4fdcef09f59846a9ce5fbde747fec0579 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 18 May 2026 15:36:16 +0200 Subject: [PATCH 18/19] fuzz: deliver chanmon control message events Handle control messages that can be produced by delayed peers. --- fuzz/src/chanmon_consistency.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 42e202965e7..8a7e33e762c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -2606,17 +2606,26 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { assert_action_timeout_awaiting_response(action); None }, - MessageSendEvent::SendChannelReady { .. } - | MessageSendEvent::SendAnnouncementSignatures { .. } - | MessageSendEvent::SendChannelUpdate { .. } => { - // Can be generated as a reestablish response. + MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { + let dest_idx = log_peer_message(node_idx, node_id, nodes, out, "channel_ready"); + nodes[dest_idx].handle_channel_ready(source_node_id, msg); None }, - MessageSendEvent::BroadcastChannelUpdate { .. } => { - // Can be generated as a result of calling `timer_tick_occurred` enough - // times while peers are disconnected. + MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { + let dest_idx = + log_peer_message(node_idx, node_id, nodes, out, "announcement_signatures"); + nodes[dest_idx].handle_announcement_signatures(source_node_id, msg); + None + }, + MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { + let dest_idx = + log_peer_message(node_idx, node_id, nodes, out, "channel_update"); + nodes[dest_idx].handle_channel_update(source_node_id, msg); None }, + MessageSendEvent::BroadcastChannelUpdate { .. } => None, + MessageSendEvent::BroadcastChannelAnnouncement { .. } => None, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => None, _ => panic!("Unhandled message event {:?}", event), } } From cf4dab7158bc21d8962cea4870d4c3100747ca3e Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 18 May 2026 15:37:56 +0200 Subject: [PATCH 19/19] fuzz: lock fundings through queued delivery Use the normal message queue while keeping setup assertions strict. --- fuzz/src/chanmon_consistency.rs | 120 +++++++++++++++++--------------- 1 file changed, 64 insertions(+), 56 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 8a7e33e762c..a20c804506c 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -1243,6 +1243,15 @@ fn directed_msg_destination(event: &MessageSendEvent) -> Option { } } +fn is_funding_lock_msg_event(event: &MessageSendEvent) -> bool { + matches!( + event, + MessageSendEvent::SendChannelReady { .. } + | MessageSendEvent::SendAnnouncementSignatures { .. } + | MessageSendEvent::BroadcastChannelAnnouncement { .. } + ) +} + impl EventQueues { fn new() -> Self { Self { ab: Vec::new(), ba: Vec::new(), bc: Vec::new(), cb: Vec::new() } @@ -1894,6 +1903,7 @@ fn build_node_config(chan_type: ChanType) -> UserConfig { let mut config = UserConfig::default(); config.channel_config.forwarding_fee_proportional_millionths = 0; config.channel_handshake_config.announce_for_forwarding = true; + config.channel_handshake_limits.force_announced_channel_preference = false; config.reject_inbound_splices = false; match chan_type { ChanType::Legacy => { @@ -1935,7 +1945,7 @@ fn connect_peers(source: &ChanMan<'_>, dest: &ChanMan<'_>) { fn make_channel( nodes: &mut [HarnessNode<'_>; 3], source_idx: usize, dest_idx: usize, chan_id: i32, trusted_open: bool, trusted_accept: bool, chain_state: &mut ChainState, -) { +) -> ChannelId { assert!(source_idx < dest_idx); let (left, right) = nodes.split_at_mut(dest_idx); let (source, dest) = (&mut left[source_idx], &mut right[0]); @@ -2088,36 +2098,7 @@ fn make_channel( } else { panic!("Wrong event type"); } -} - -fn lock_fundings(nodes: &[HarnessNode<'_>; 3]) { - let mut node_events = Vec::new(); - for node in nodes.iter() { - node_events.push(node.get_and_clear_pending_msg_events()); - } - for (idx, node_event) in node_events.iter().enumerate() { - for event in node_event { - if let MessageSendEvent::SendChannelReady { ref node_id, ref msg } = event { - for node in nodes.iter() { - if node.get_our_node_id() == *node_id { - node.handle_channel_ready(nodes[idx].get_our_node_id(), msg); - } - } - } else { - panic!("Wrong event type"); - } - } - } - - for node in nodes.iter() { - let events = node.get_and_clear_pending_msg_events(); - for event in events { - if let MessageSendEvent::SendAnnouncementSignatures { .. } = event { - } else { - panic!("Wrong event type"); - } - } - } + channel_id } impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { @@ -2226,14 +2207,18 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { // channel gets its own txid and funding outpoint. // A-B: channel 2 A and B have 0-reserve (trusted open + trusted accept), // channel 3 A has 0-reserve (trusted accept). - make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state); - make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state); - make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state); + let chan_ab_ids = [ + make_channel(&mut nodes, 0, 1, 1, false, false, &mut chain_state), + make_channel(&mut nodes, 0, 1, 2, true, true, &mut chain_state), + make_channel(&mut nodes, 0, 1, 3, false, true, &mut chain_state), + ]; // B-C: channel 4 B has 0-reserve (via trusted accept), // channel 5 C has 0-reserve (via trusted open). - make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state); - make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state); - make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state); + let chan_bc_ids = [ + make_channel(&mut nodes, 1, 2, 4, false, true, &mut chain_state), + make_channel(&mut nodes, 1, 2, 5, true, false, &mut chain_state), + make_channel(&mut nodes, 1, 2, 6, false, false, &mut chain_state), + ]; // Wipe the transactions-broadcasted set to make sure we don't broadcast // any transactions during normal operation after setup. @@ -2246,24 +2231,7 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { nodes[1].sync_with_chain_state(&chain_state, None); nodes[2].sync_with_chain_state(&chain_state, None); - lock_fundings(&nodes); - - let chan_ab_ids = { - // Get channel IDs for all A-B channels (from node A's perspective). - let node_a_chans = nodes[0].list_usable_channels(); - [node_a_chans[0].channel_id, node_a_chans[1].channel_id, node_a_chans[2].channel_id] - }; - let chan_bc_ids = { - // Get channel IDs for all B-C channels (from node C's perspective). - let node_c_chans = nodes[2].list_usable_channels(); - [node_c_chans[0].channel_id, node_c_chans[1].channel_id, node_c_chans[2].channel_id] - }; - - for node in &mut nodes { - node.force_checkpoint_manager_persistence(); - } - - Self { + let mut harness = Self { out, chan_type, chain_state, @@ -2272,7 +2240,33 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { bc_link: PeerLink::new(1, 2, chan_bc_ids), queues: EventQueues::new(), payments: PaymentTracker::new(), + }; + + harness.lock_fundings(); + + for node in &mut harness.nodes { + node.force_checkpoint_manager_persistence(); } + + harness + } + + fn lock_fundings(&mut self) { + for _ in 0..10 { + let mut had_events = false; + for node_idx in 0..self.nodes.len() { + had_events |= self.process_msg_events_matching( + node_idx, + false, + ProcessMessages::AllMessages, + Some(is_funding_lock_msg_event), + ); + } + if !had_events { + return; + } + } + panic!("lock_fundings did not settle"); } fn chan_a_id(&self) -> ChannelId { @@ -2405,8 +2399,9 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { } } - fn process_msg_events( + fn process_msg_events_matching( &mut self, node_idx: usize, corrupt_forward: bool, limit_events: ProcessMessages, + expected_event: Option bool>, ) -> bool { fn log_msg_delivery( node_idx: usize, dest_idx: usize, msg_name: &str, out: &Out, @@ -2643,6 +2638,13 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { let mut events_iter = events.drain(..).chain(new_events.drain(..)); let mut extra_ev = None; for event in &mut events_iter { + if let Some(expected_event) = expected_event { + assert!( + expected_event(&event), + "Unexpected message event while locking fundings: {:?}", + event + ); + } had_events = true; extra_ev = process_msg_event( node_idx, @@ -2661,6 +2663,12 @@ impl<'a, Out: Output + MaybeSend + MaybeSync> Harness<'a, Out> { had_events } + fn process_msg_events( + &mut self, node_idx: usize, corrupt_forward: bool, limit_events: ProcessMessages, + ) -> bool { + self.process_msg_events_matching(node_idx, corrupt_forward, limit_events, None) + } + fn process_events(&mut self, node_idx: usize, fail: bool) -> bool { let nodes = &self.nodes; let chain_state = &mut self.chain_state;