diff --git a/CHANGELOG.md b/CHANGELOG.md index 058593c02..ea278c112 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,12 @@ -# 0.7.0-rc.39 (Synonym Fork) +# 0.7.0-rc.42 (Synonym Fork) ## Bug Fixes +- Persist missing announced channel peers from the network graph during + build-time restore and retry after Rapid Gossip Sync graph updates. Explicit + disconnects and last-channel closes suppress RGS re-persistence during the + current node instance; after restart, active restored channels may persist + peers again from the graph so they can reconnect. - Fixed orphaned channel migration blocking node startup when the existing monitor in the KV store can't be deserialized (e.g., `UnknownVersion` from a newer LDK version). The migration now skips writing and lets the node start normally, diff --git a/Cargo.toml b/Cargo.toml index a6df12515..f0f1acfcc 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ exclude = ["bindings/uniffi-bindgen"] [package] name = "ldk-node" -version = "0.7.0-rc.39" +version = "0.7.0-rc.42" authors = ["Elias Rohrer "] homepage = "https://lightningdevkit.org/" license = "MIT OR Apache-2.0" diff --git a/Package.swift b/Package.swift index 4e2eb80ea..eca195981 100644 --- a/Package.swift +++ b/Package.swift @@ -3,8 +3,8 @@ import PackageDescription -let tag = "v0.7.0-rc.39" -let checksum = "67fae23a802b1ab48833c29c95cb62a9167024be9dbbe54e6745d6c108f70308" +let tag = "v0.7.0-rc.42" +let checksum = "c2f1d677ae85fe7b1c06670994088abe51a020197a5f2e2b5cd8718856dd624c" let url = "https://github.com/synonymdev/ldk-node/releases/download/\(tag)/LDKNodeFFI.xcframework.zip" let package = Package( diff --git a/bindings/kotlin/ldk-node-android/gradle.properties b/bindings/kotlin/ldk-node-android/gradle.properties index aca9fb76e..8b12b14de 100644 --- a/bindings/kotlin/ldk-node-android/gradle.properties +++ b/bindings/kotlin/ldk-node-android/gradle.properties @@ -3,4 +3,4 @@ android.useAndroidX=true android.enableJetifier=true kotlin.code.style=official group=com.synonym -version=0.7.0-rc.39 +version=0.7.0-rc.42 diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so index e3f585686..9905ac92e 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so index 8ce45fffc..a0c23aa3e 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so index d62aa6e73..271afdfc1 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-jvm/gradle.properties b/bindings/kotlin/ldk-node-jvm/gradle.properties index 7a235472f..83f2602c8 100644 --- a/bindings/kotlin/ldk-node-jvm/gradle.properties +++ b/bindings/kotlin/ldk-node-jvm/gradle.properties @@ -1,4 +1,4 @@ org.gradle.jvmargs=-Xmx1536m kotlin.code.style=official group=com.synonym -version=0.7.0-rc.39 +version=0.7.0-rc.42 diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib index abb7ce2f8..6525f9f2c 100644 Binary files a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib and b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib differ diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib index e43411107..a57a0ae1c 100644 Binary files a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib and b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib differ diff --git a/bindings/python/pyproject.toml b/bindings/python/pyproject.toml index a89f8d7e7..2980613c3 100644 --- a/bindings/python/pyproject.toml +++ b/bindings/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ldk_node" -version = "0.7.0-rc.39" +version = "0.7.0-rc.42" authors = [ { name="Elias Rohrer", email="dev@tnull.de" }, ] diff --git a/src/builder.rs b/src/builder.rs index 00269e04b..fe4c259ee 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -5,7 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::default::Default; use std::path::PathBuf; @@ -77,7 +77,7 @@ use crate::liquidity::{ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; -use crate::peer_store::PeerStore; +use crate::peer_store::{persist_missing_channel_peers, PeerStore}; use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ @@ -2347,6 +2347,13 @@ fn build_with_store_internal( }, }; + persist_missing_channel_peers( + channel_manager.list_channels().into_iter().map(|channel| channel.counterparty.node_id), + &network_graph, + &peer_store, + Arc::clone(&logger), + ); + let om_mailbox = if let Some(AsyncPaymentsRole::Server) = async_payments_role { Some(Arc::new(OnionMessageMailbox::new())) } else { @@ -2385,6 +2392,7 @@ fn build_with_store_internal( _router: router, scorer, peer_store, + rgs_peer_recovery_exclusions: Arc::new(RwLock::new(HashSet::new())), payment_store, is_running, node_metrics, diff --git a/src/event.rs b/src/event.rs index 940a2f029..f30072482 100644 --- a/src/event.rs +++ b/src/event.rs @@ -47,11 +47,11 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore; use crate::payment::store::{ PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus, }; +use crate::peer_store::persist_missing_channel_peers; use crate::runtime::Runtime; use crate::types::{CustomTlvRecord, DynStore, OnionMessenger, PaymentStore, Sweeper, Wallet}; use crate::{ - hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, - UserChannelId, + hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerStore, UserChannelId, }; /// Details about a transaction input. @@ -2122,35 +2122,18 @@ where }, }; - let network_graph = self.network_graph.read_only(); let channels = self.channel_manager.list_channels_with_counterparty(&counterparty_node_id); if let Some(pending_channel) = channels.into_iter().find(|c| c.channel_id == channel_id) { - if !pending_channel.is_outbound - && self.peer_store.get_peer(&counterparty_node_id).is_none() - { - if let Some(address) = network_graph - .nodes() - .get(&NodeId::from_pubkey(&counterparty_node_id)) - .and_then(|node_info| node_info.announcement_info.as_ref()) - .and_then(|ann_info| ann_info.addresses().first()) - { - let peer = PeerInfo { - node_id: counterparty_node_id, - address: address.clone(), - }; - - self.peer_store.add_peer(peer).unwrap_or_else(|e| { - log_error!( - self.logger, - "Failed to add peer {} to peer store: {}", - counterparty_node_id, - e - ); - }); - } + if !pending_channel.is_outbound { + persist_missing_channel_peers( + std::iter::once(counterparty_node_id), + &self.network_graph, + &self.peer_store, + self.logger.clone(), + ); } } }, diff --git a/src/lib.rs b/src/lib.rs index 212e3660d..0f2d12f90 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -103,7 +103,7 @@ mod tx_broadcaster; mod types; mod wallet; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::default::Default; use std::net::ToSocketAddrs; use std::ops::Deref; @@ -162,7 +162,7 @@ use payment::{ Bolt11Payment, Bolt12Payment, OnchainPayment, PaymentDetails, SpontaneousPayment, UnifiedQrPayment, }; -use peer_store::{PeerInfo, PeerStore}; +use peer_store::{persist_missing_channel_peers_excluding, PeerInfo, PeerStore}; pub use probe_handle::ProbeHandle; use rand::Rng; use runtime::Runtime; @@ -214,6 +214,7 @@ pub struct Node { _router: Arc, scorer: Arc>, peer_store: Arc>>, + rgs_peer_recovery_exclusions: Arc>>, payment_store: Arc, is_running: Arc>, node_metrics: Arc>, @@ -274,6 +275,10 @@ impl Node { let gossip_sync_store = Arc::clone(&self.kv_store); let gossip_sync_logger = Arc::clone(&self.logger); let gossip_node_metrics = Arc::clone(&self.node_metrics); + let gossip_channel_manager = Arc::clone(&self.channel_manager); + let gossip_network_graph = Arc::clone(&self.network_graph); + let gossip_peer_store = Arc::clone(&self.peer_store); + let gossip_peer_recovery_exclusions = Arc::clone(&self.rgs_peer_recovery_exclusions); let mut stop_gossip_sync = self.stop_sender.subscribe(); self.runtime.spawn_cancellable_background_task(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); @@ -295,6 +300,18 @@ impl Node { "Background sync of RGS gossip data finished in {}ms.", now.elapsed().as_millis() ); + let peer_recovery_exclusions = + gossip_peer_recovery_exclusions.read().unwrap(); + persist_missing_channel_peers_excluding( + gossip_channel_manager + .list_channels() + .into_iter() + .map(|channel| channel.counterparty.node_id), + &gossip_network_graph, + &gossip_peer_store, + &peer_recovery_exclusions, + Arc::clone(&gossip_sync_logger), + ); { let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); @@ -1372,6 +1389,7 @@ impl Node { // races with an in-flight reconnection loop attempt at the old address. if persist { self.peer_store.add_peer(peer_info.clone())?; + self.rgs_peer_recovery_exclusions.write().unwrap().remove(&node_id); } let con_node_id = peer_info.node_id; @@ -1391,8 +1409,13 @@ impl Node { /// Disconnects the peer with the given node id. /// - /// Will also remove the peer from the peer store, i.e., after this has been called we won't - /// try to reconnect on restart. + /// Will also remove the peer from the peer store, i.e., the stored peer entry won't be used + /// by the reconnect loop. + /// + /// If an active channel with this peer is later restored and the peer has an announced address + /// in the network graph, startup may persist the peer again so the channel can reconnect. + /// Background RGS retries will not re-persist the peer during this node instance unless the + /// peer is explicitly connected with persistence enabled or a new channel is opened. pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> { if !*self.is_running.read().unwrap() { return Err(Error::NotRunning); @@ -1400,6 +1423,7 @@ impl Node { log_info!(self.logger, "Disconnecting peer {}..", counterparty_node_id); + self.rgs_peer_recovery_exclusions.write().unwrap().insert(counterparty_node_id); match self.peer_store.remove_peer(&counterparty_node_id) { Ok(()) => {}, Err(e) => { @@ -1465,6 +1489,7 @@ impl Node { peer_info.node_id ); self.peer_store.add_peer(peer_info)?; + self.rgs_peer_recovery_exclusions.write().unwrap().remove(&node_id); Ok(UserChannelId(user_channel_id)) }, Err(e) => { @@ -1885,6 +1910,7 @@ impl Node { // Check if this was the last open channel, if so, forget the peer. if open_channels.len() == 1 { + self.rgs_peer_recovery_exclusions.write().unwrap().insert(counterparty_node_id); self.peer_store.remove_peer(&counterparty_node_id)?; } } diff --git a/src/peer_store.rs b/src/peer_store.rs index fc0ea090b..0c2131a49 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -5,12 +5,13 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Deref; use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; use lightning::impl_writeable_tlv_based; +use lightning::routing::gossip::NodeId; use lightning::util::persist::KVStoreSync; use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; @@ -19,7 +20,7 @@ use crate::io::{ PEER_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, log_info, LdkLogger}; -use crate::types::DynStore; +use crate::types::{DynStore, Graph}; use crate::{Error, SocketAddress}; pub struct PeerStore @@ -56,8 +57,11 @@ where ); } - locked_peers.insert(peer_info.node_id, peer_info); - self.persist_peers(&*locked_peers) + let mut updated_peers = locked_peers.clone(); + updated_peers.insert(peer_info.node_id, peer_info); + self.persist_peers(&updated_peers)?; + *locked_peers = updated_peers; + Ok(()) } pub(crate) fn remove_peer(&self, node_id: &PublicKey) -> Result<(), Error> { @@ -156,15 +160,90 @@ impl_writeable_tlv_based!(PeerInfo, { (2, address, required), }); +pub(crate) fn persist_missing_channel_peers( + counterparty_node_ids: I, network_graph: &Graph, peer_store: &PeerStore, logger: L, +) where + L: Deref, + L::Target: LdkLogger, + I: IntoIterator, +{ + persist_missing_channel_peers_excluding( + counterparty_node_ids, + network_graph, + peer_store, + &HashSet::new(), + logger, + ) +} + +pub(crate) fn persist_missing_channel_peers_excluding( + counterparty_node_ids: I, network_graph: &Graph, peer_store: &PeerStore, + excluded_node_ids: &HashSet, logger: L, +) where + L: Deref, + L::Target: LdkLogger, + I: IntoIterator, +{ + let graph = network_graph.read_only(); + let mut seen = HashSet::new(); + let missing_peers = counterparty_node_ids + .into_iter() + .filter_map(|counterparty_node_id| { + if !seen.insert(counterparty_node_id) + || excluded_node_ids.contains(&counterparty_node_id) + || peer_store.get_peer(&counterparty_node_id).is_some() + { + return None; + } + + graph + .nodes() + .get(&NodeId::from_pubkey(&counterparty_node_id)) + .and_then(|node_info| node_info.announcement_info.as_ref()) + .and_then(|announcement_info| announcement_info.addresses().first()) + .cloned() + .map(|address| PeerInfo { node_id: counterparty_node_id, address }) + }) + .collect::>(); + drop(graph); + + for peer_info in missing_peers { + let node_id = peer_info.node_id; + if peer_store.get_peer(&node_id).is_some() { + continue; + } + + match peer_store.add_peer(peer_info) { + Ok(()) => { + log_info!(logger, "Persisted peer {} from channel counterparty", node_id) + }, + Err(e) => { + log_error!(logger, "Failed to persist peer {}: {}", node_id, e) + }, + } + } +} + #[cfg(test)] mod tests { + use std::collections::HashMap; + use std::future::Future; + use std::pin::Pin; use std::str::FromStr; - use std::sync::Arc; - + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + + use bitcoin::Network; + use lightning::io; + use lightning::ln::msgs::UnsignedNodeAnnouncement; + use lightning::routing::gossip::{NodeAlias, NodeId}; + use lightning::types::features::{ChannelFeatures, NodeFeatures}; + use lightning::util::persist::KVStore; use lightning::util::test_utils::TestLogger; use super::*; use crate::io::test_utils::InMemoryStore; + use crate::logger::Logger; #[test] fn peer_info_persistence() { @@ -254,4 +333,315 @@ mod tests { peer_store.add_peer(PeerInfo { node_id, address }).unwrap(); assert_eq!(peer_store.list_peers().len(), 1); } + + #[test] + fn peer_add_persistence_failure_leaves_peer_retryable() { + let store: Arc = Arc::new(FailFirstWriteStore::new()); + let logger = Arc::new(TestLogger::new()); + let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); + + let node_id = PublicKey::from_str( + "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", + ) + .unwrap(); + let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); + let peer_info = PeerInfo { node_id, address }; + + assert!(matches!(peer_store.add_peer(peer_info.clone()), Err(Error::PersistenceFailed))); + assert!(peer_store.get_peer(&node_id).is_none()); + + peer_store.add_peer(peer_info.clone()).unwrap(); + assert_eq!(peer_store.get_peer(&node_id), Some(peer_info)); + } + + #[test] + fn missing_channel_peer_is_persisted_from_graph() { + let store: Arc = Arc::new(InMemoryStore::new()); + let logger = Arc::new(Logger::new_log_facade()); + let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); + let network_graph = Graph::new(Network::Regtest.into(), Arc::clone(&logger)); + + let node_id = PublicKey::from_str( + "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", + ) + .unwrap(); + let other_node_id = PublicKey::from_str( + "02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619", + ) + .unwrap(); + let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); + + network_graph + .add_channel_from_partial_announcement( + 42, + None, + 0, + ChannelFeatures::empty(), + NodeId::from_pubkey(&node_id), + NodeId::from_pubkey(&other_node_id), + ) + .unwrap(); + network_graph + .update_node_from_unsigned_announcement(&UnsignedNodeAnnouncement { + features: NodeFeatures::empty(), + timestamp: 1, + node_id: NodeId::from_pubkey(&node_id), + rgb: [0; 3], + alias: NodeAlias([0; 32]), + addresses: vec![address.clone()], + excess_address_data: Vec::new(), + excess_data: Vec::new(), + }) + .unwrap(); + + persist_missing_channel_peers(vec![node_id], &network_graph, &peer_store, logger); + + let peer = peer_store.get_peer(&node_id).unwrap(); + assert_eq!(peer.node_id, node_id); + assert_eq!(peer.address, address); + } + + #[test] + fn missing_channel_peer_without_announced_address_is_skipped() { + let store: Arc = Arc::new(InMemoryStore::new()); + let logger = Arc::new(Logger::new_log_facade()); + let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); + let network_graph = Graph::new(Network::Regtest.into(), Arc::clone(&logger)); + + let node_id = PublicKey::from_str( + "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", + ) + .unwrap(); + + persist_missing_channel_peers(vec![node_id], &network_graph, &peer_store, logger); + + assert!(peer_store.get_peer(&node_id).is_none()); + } + + #[test] + fn missing_channel_peer_is_persisted_after_graph_retry() { + let store: Arc = Arc::new(InMemoryStore::new()); + let logger = Arc::new(Logger::new_log_facade()); + let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); + let network_graph = Graph::new(Network::Regtest.into(), Arc::clone(&logger)); + + let node_id = PublicKey::from_str( + "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", + ) + .unwrap(); + let other_node_id = PublicKey::from_str( + "02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619", + ) + .unwrap(); + let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); + + persist_missing_channel_peers( + vec![node_id], + &network_graph, + &peer_store, + Arc::clone(&logger), + ); + assert!(peer_store.get_peer(&node_id).is_none()); + + network_graph + .add_channel_from_partial_announcement( + 42, + None, + 0, + ChannelFeatures::empty(), + NodeId::from_pubkey(&node_id), + NodeId::from_pubkey(&other_node_id), + ) + .unwrap(); + network_graph + .update_node_from_unsigned_announcement(&UnsignedNodeAnnouncement { + features: NodeFeatures::empty(), + timestamp: 1, + node_id: NodeId::from_pubkey(&node_id), + rgb: [0; 3], + alias: NodeAlias([0; 32]), + addresses: vec![address.clone()], + excess_address_data: Vec::new(), + excess_data: Vec::new(), + }) + .unwrap(); + + persist_missing_channel_peers(vec![node_id], &network_graph, &peer_store, logger); + + let peer = peer_store.get_peer(&node_id).unwrap(); + assert_eq!(peer.node_id, node_id); + assert_eq!(peer.address, address); + } + + #[test] + fn excluded_missing_channel_peer_is_not_persisted_from_graph() { + let store: Arc = Arc::new(InMemoryStore::new()); + let logger = Arc::new(Logger::new_log_facade()); + let peer_store = PeerStore::new(Arc::clone(&store), Arc::clone(&logger)); + let network_graph = Graph::new(Network::Regtest.into(), Arc::clone(&logger)); + + let node_id = PublicKey::from_str( + "0276607124ebe6a6c9338517b6f485825b27c2dcc0b9fc2aa6a4c0df91194e5993", + ) + .unwrap(); + let other_node_id = PublicKey::from_str( + "02eec7245d6b7d2ccb30380bfbe2a3648cd7a942653f5aa340edcea1f283686619", + ) + .unwrap(); + let address = SocketAddress::from_str("127.0.0.1:9738").unwrap(); + let excluded_node_ids = HashSet::from([node_id]); + + network_graph + .add_channel_from_partial_announcement( + 42, + None, + 0, + ChannelFeatures::empty(), + NodeId::from_pubkey(&node_id), + NodeId::from_pubkey(&other_node_id), + ) + .unwrap(); + network_graph + .update_node_from_unsigned_announcement(&UnsignedNodeAnnouncement { + features: NodeFeatures::empty(), + timestamp: 1, + node_id: NodeId::from_pubkey(&node_id), + rgb: [0; 3], + alias: NodeAlias([0; 32]), + addresses: vec![address], + excess_address_data: Vec::new(), + excess_data: Vec::new(), + }) + .unwrap(); + + persist_missing_channel_peers_excluding( + vec![node_id], + &network_graph, + &peer_store, + &excluded_node_ids, + logger, + ); + + assert!(peer_store.get_peer(&node_id).is_none()); + } + + struct FailFirstWriteStore { + persisted_bytes: Mutex>>>, + fail_next_write: AtomicBool, + } + + impl FailFirstWriteStore { + fn new() -> Self { + Self { + persisted_bytes: Mutex::new(HashMap::new()), + fail_next_write: AtomicBool::new(true), + } + } + + fn read_internal( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + let persisted_lock = self.persisted_bytes.lock().unwrap(); + let prefixed = format!("{}/{}", primary_namespace, secondary_namespace); + persisted_lock + .get(&prefixed) + .and_then(|inner| inner.get(key)) + .cloned() + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Key not found")) + } + + fn write_internal( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if self.fail_next_write.swap(false, Ordering::Relaxed) { + return Err(io::Error::new(io::ErrorKind::Other, "Injected write failure")); + } + + let mut persisted_lock = self.persisted_bytes.lock().unwrap(); + let prefixed = format!("{}/{}", primary_namespace, secondary_namespace); + persisted_lock + .entry(prefixed) + .or_insert_with(HashMap::new) + .insert(key.to_string(), buf); + Ok(()) + } + + fn remove_internal( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result<()> { + let mut persisted_lock = self.persisted_bytes.lock().unwrap(); + let prefixed = format!("{}/{}", primary_namespace, secondary_namespace); + if let Some(inner) = persisted_lock.get_mut(&prefixed) { + inner.remove(key); + } + Ok(()) + } + + fn list_internal( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + let persisted_lock = self.persisted_bytes.lock().unwrap(); + let prefixed = format!("{}/{}", primary_namespace, secondary_namespace); + Ok(persisted_lock + .get(&prefixed) + .map(|inner| inner.keys().cloned().collect()) + .unwrap_or_default()) + } + } + + impl KVStore for FailFirstWriteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin>> + Send + 'static>> { + let res = self.read_internal(primary_namespace, secondary_namespace, key); + Box::pin(async move { res }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send + 'static>> { + let res = self.write_internal(primary_namespace, secondary_namespace, key, buf); + Box::pin(async move { res }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> Pin> + Send + 'static>> { + let res = self.remove_internal(primary_namespace, secondary_namespace, key); + Box::pin(async move { res }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin>> + Send + 'static>> { + let res = self.list_internal(primary_namespace, secondary_namespace); + Box::pin(async move { res }) + } + } + + impl KVStoreSync for FailFirstWriteStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.read_internal(primary_namespace, secondary_namespace, key) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.write_internal(primary_namespace, secondary_namespace, key, buf) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + ) -> io::Result<()> { + self.remove_internal(primary_namespace, secondary_namespace, key) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + self.list_internal(primary_namespace, secondary_namespace) + } + } }