From d9dfeb8247632bdc58956c6b04e6f5b18159fc37 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 11:27:40 +0100 Subject: [PATCH 1/7] Implement tiered storage This commit: Adds `TierStore`, a tiered `KVStore`/`KVStoreSync` implementation that routes node persistence across three storage roles: - a primary store for durable, authoritative data - an optional backup store for a second durable copy of primary-backed data - an optional ephemeral store for rebuildable cached data such as the network graph and scorer TierStore routes ephemeral cache data to the ephemeral store when configured, while durable data remains primary+backup. Reads and lists do not consult the backup store during normal operation. For primary+backup writes and removals, this implementation treats the backup store as part of the persistence success path rather than as a best-effort background mirror. Earlier designs used asynchronous backup queueing to avoid blocking the primary path, but that weakens the durability contract by allowing primary success to be reported before backup persistence has completed. TierStore now issues primary and backup operations together and only returns success once both complete. This gives callers a clearer persistence guarantee when a backup store is configured: acknowledged primary+backup mutations have been attempted against both durable stores. The tradeoff is that dual-store operations are not atomic across stores, so an error may still be returned after one store has already been updated. TierStore also implements `KVStoreSync` in terms of dedicated synchronous helpers that call the wrapped stores' sync interfaces directly. This preserves the inner stores' synchronous semantics instead of routing sync operations through a previously held async runtime. Additionally, adds unit coverage for the current contract, including: - basic read/write/remove/list persistence - routing of ephemeral data away from the primary store - backup participation in the foreground success path for writes and removals --- src/io/mod.rs | 1 + src/io/tier_store.rs | 906 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 907 insertions(+) create mode 100644 src/io/tier_store.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index e080d39f7e..bf6366c45b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -10,6 +10,7 @@ pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 0000000000..13f17862f0 --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,906 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; + +use std::future::Future; +use std::sync::Arc; + +/// A 3-tiered [`KVStore`]/[`KVStoreSync`] implementation that routes data across +/// storage backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner + .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { primary_store, ephemeral_store: None, backup_store: None, logger } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + fn read_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStoreSync::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + fn list_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list keys in namespace {}/{} from primary store: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn write_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let backup_res = KVStoreSync::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn remove_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let backup_res = KVStoreSync::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn read_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) + } else { + self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + } + + fn write_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::write( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } else { + self.write_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + } + + fn remove_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::remove( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } else { + self.remove_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn list_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list( + ephemeral_store.as_ref(), + &primary_namespace, + &secondary_namespace, + ) + } else { + self.list_primary_sync(&primary_namespace, &secondary_namespace) + } + }, + _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), + } + } + + fn ephemeral_store( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Option<&Arc> { + self.ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => Err(primary_err), + (Ok(()), Err(backup_err)) => Err(backup_err), + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[test] + fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .unwrap(); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} From b017e6799ee861d56240e4efd9d7dcbe9843c904 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 19:17:22 +0100 Subject: [PATCH 2/7] Integrate TierStore into NodeBuilder Add native builder support for tiered storage by introducing `TierStoreConfig` and builder methods for configuring backup and ephemeral stores. During node construction, wrap the configured primary store in `TierStore` and attach any configured secondary tiers: ephemeral storage for cache-like data and backup storage for mirrored durable writes. --- src/builder.rs | 66 ++++++++++++++++++++++++++++++++++++++++++-- src/io/tier_store.rs | 1 - src/types.rs | 2 +- 3 files changed, 65 insertions(+), 4 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 54a2f51abc..83a20d7c7a 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -58,6 +58,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, read_output_sweeper, read_peer_info, read_scorer, @@ -154,6 +155,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup: Option>, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -289,6 +305,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -307,6 +324,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -316,6 +334,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -625,6 +644,34 @@ impl NodeBuilder { self } + /// Configures the backup store for local disaster recovery. + /// + /// When building with tiered storage, this store receives a second durable + /// copy of data written to the primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup stores complete successfully. + /// + /// If not set, durable data will be stored only in the primary store. + pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup = Some(backup_store); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -780,11 +827,18 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_store`]: Self::set_backup_store pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; - self.build_with_store_and_logger(node_entropy, kv_store, logger) } @@ -800,6 +854,14 @@ impl NodeBuilder { })?) }; + let ts_config = self.tier_store_config.as_ref(); + let primary_store = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -814,7 +876,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 13f17862f0..0a85fb6a51 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -4,7 +4,6 @@ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license , at your option. You may not use this file except in // accordance with one or both of these licenses. -#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. use crate::io::utils::check_namespace_key_validity; use crate::logger::{LdkLogger, Logger}; diff --git a/src/types.rs b/src/types.rs index 06e65fbd0a..aec3967b15 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,7 +57,7 @@ where { } -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; From f0361fd232a333a9a30b133626d71276cd24eef5 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 5 May 2026 23:09:49 +0100 Subject: [PATCH 3/7] fixup! Integrate TierStore into NodeBuilder Refactor backup storage to local SQLite Replaces the builder's BYO backup-store configuration with a path-based local SQLite backup mirror. The builder now constructs the backup store internally using a dedicated backup database file name and rejects configurations where the backup path conflicts with the primary storage path. Also adds test coverage for full-cycle backup mirroring and same-path rejection, as well as a `setup_node_with_builder` test helper to allow builder customization in integration tests. --- src/builder.rs | 65 +++++++++++++++++++++++------ src/io/sqlite_store/mod.rs | 2 + tests/common/mod.rs | 16 +++++-- tests/integration_tests_rust.rs | 74 +++++++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 16 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 83a20d7c7a..9afb219710 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -158,14 +158,14 @@ impl std::fmt::Debug for LogWriterConfig { #[derive(Default)] struct TierStoreConfig { ephemeral: Option>, - backup: Option>, + backup_storage_dir_path: Option, } impl std::fmt::Debug for TierStoreConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("TierStoreConfig") .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) - .field("backup", &self.backup.as_ref().map(|_| "Arc")) + .field("backup_storage_dir_path", &self.backup_storage_dir_path) .finish() } } @@ -216,6 +216,11 @@ pub enum BuildError { AsyncPaymentsConfigMismatch, /// An attempt to setup a DNS Resolver failed. DNSResolverSetupFailed, + /// The configured backup storage path conflicts with the primary storage path. + /// + /// Backup storage must use a distinct local directory so that the primary and + /// backup stores do not point to the same SQLite database. + BackupStorePathConflict, } impl fmt::Display for BuildError { @@ -253,6 +258,12 @@ impl fmt::Display for BuildError { Self::DNSResolverSetupFailed => { write!(f, "An attempt to setup a DNS resolver has failed.") }, + Self::BackupStorePathConflict => { + write!( + f, + "The configured backup storage path conflicts with the primary storage path." + ) + }, } } } @@ -644,18 +655,26 @@ impl NodeBuilder { self } - /// Configures the backup store for local disaster recovery. + /// Configures a local SQLite backup store for disaster recovery. /// - /// When building with tiered storage, this store receives a second durable - /// copy of data written to the primary store. + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. /// /// Writes and removals for primary-backed data only succeed once both the - /// primary and backup stores complete successfully. + /// primary and backup SQLite stores complete successfully. + /// + /// The configured path must point to a distinct local directory from the + /// primary storage path. If the backup path equals the primary storage path, + /// building will fail with [`BuildError::BackupStorePathConflict`]. /// /// If not set, durable data will be stored only in the primary store. - pub fn set_backup_store(&mut self, backup_store: Arc) -> &mut Self { + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&mut self, backup_storage_dir_path: String) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); - tier_store_config.backup = Some(backup_store); + tier_store_config.backup_storage_dir_path = Some(backup_storage_dir_path.into()); self } @@ -830,11 +849,11 @@ impl NodeBuilder { /// /// The provided `kv_store` will be used as the primary storage backend. Optionally, /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) - /// and a backup store for local disaster recovery can be configured via - /// [`set_ephemeral_store`] and [`set_backup_store`]. + /// and a local SQLite backup store for disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_storage_dir_path`]. /// /// [`set_ephemeral_store`]: Self::set_ephemeral_store - /// [`set_backup_store`]: Self::set_backup_store + /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { @@ -859,7 +878,29 @@ impl NodeBuilder { let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); if let Some(config) = ts_config { config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); - config.backup.as_ref().map(|s| tier_store.set_backup_store(Arc::clone(s))); + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { + let primary_storage_dir_path = PathBuf::from(&self.config.storage_dir_path); + if primary_storage_dir_path == *backup_storage_dir_path { + log_error!( + logger, + "Backup storage path must differ from primary storage path: {}", + backup_storage_dir_path.display() + ); + return Err(BuildError::BackupStorePathConflict); + } + + let backup_store = SqliteStore::new( + backup_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup backup SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let backup_store: Arc = Arc::new(DynStoreWrapper(backup_store)); + tier_store.set_backup_store(backup_store); + } } let seed_bytes = node_entropy.to_seed_bytes(); diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 84af03adc5..098765d0df 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -26,6 +26,8 @@ mod migrations; /// LDK Node's database file name. pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; +/// LDK Node's backup database file name. +pub const SQLITE_BACKUP_DB_FILE_NAME: &str = "ldk_node_data_backup.sqlite"; /// LDK Node's table in which we store all data. pub const KV_TABLE_NAME: &str = "ldk_node_data"; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 00c8808a7b..87208a6d66 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -527,7 +527,17 @@ pub(crate) fn setup_two_nodes_with_store( } pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { + setup_node_with_builder(chain_source, config, |_| {}) +} + +pub(crate) fn setup_node_with_builder( + chain_source: &TestChainSource, config: TestConfig, configure_builder: F, +) -> TestNode +where + F: FnOnce(&mut Builder), +{ setup_builder!(builder, config.node_config); + match chain_source { TestChainSource::Esplora(electrsd) => { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -586,6 +596,8 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + configure_builder(&mut builder); + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -594,10 +606,6 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), }; - if config.recovery_mode { - builder.set_wallet_recovery_mode(); - } - node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index d2c057a164..1b1fc48257 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -30,6 +30,7 @@ use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -39,6 +40,7 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::KVStoreSync; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -2957,3 +2959,75 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn builder_configures_sqlite_backup_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let primary_dir = config_a.node_config.storage_dir_path.clone(); + let backup_dir = common::random_storage_path(); + let node_a = common::setup_node_with_builder(&chain_source, config_a.clone(), |builder| { + builder.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); + }); + + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + true, + false, + ) + .await; + + let primary_store = SqliteStore::new( + primary_dir.into(), + Some(ldk_node::io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + let backup_store = SqliteStore::new( + backup_dir, + Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + for (pn, sn, key) in [ + ("bdk_wallet", "", "descriptor"), + ("bdk_wallet", "", "change_descriptor"), + ("bdk_wallet", "", "network"), + ("", "", "node_metrics"), + ("", "", "events"), + ("", "", "peers"), + ] { + let primary = KVStoreSync::read(&primary_store, pn, sn, key).unwrap(); + let backup = KVStoreSync::read(&backup_store, pn, sn, key).unwrap(); + + assert_eq!(backup, primary, "backup mismatch for {pn}/{sn}/{key}"); + } +} + +#[test] +fn sqlite_backup_rejects_primary_storage_path() { + let mut config = random_config(false); + config.store_type = TestStoreType::Sqlite; + + let primary_dir = config.node_config.storage_dir_path.clone(); + + setup_builder!(builder, config.node_config); + builder.set_backup_storage_dir_path(primary_dir); + + let res = builder.build(config.node_entropy.into()); + + assert!(matches!(res, Err(ldk_node::BuildError::BackupStorePathConflict))); +} From 0ae61b757a14e7bcd6ff0785a056bc8865cb0063 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Wed, 6 May 2026 10:48:00 +0100 Subject: [PATCH 4/7] Fix uniffi Builder tiered-storage support in tests - Make setup_builder! use a mutable binding for Builder under uniffi to preserve test helper compatibility for the FFI-backed builder - Add ArcedNodeBuilder forwarding methods set_backup_storage_dir_path and set_ephemeral_store Co-authored-by: Copilot --- src/builder.rs | 32 ++++++++++++++++++++++++++++++++ tests/common/mod.rs | 4 +--- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 9afb219710..8637ae334b 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1212,6 +1212,38 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } + /// Configures a local SQLite backup store for disaster recovery. + /// + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup SQLite stores complete successfully. + /// + /// The configured path must point to a distinct local directory from the + /// primary storage path. If the backup path equals the primary storage path, + /// building will fail with [`BuildError::BackupStorePathConflict`]. + /// + /// If not set, durable data will be stored only in the primary store. + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&self, backup_storage_dir_path: String) { + self.inner.write().expect("lock").set_backup_storage_dir_path(backup_storage_dir_path); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + self.inner.write().expect("lock").set_ephemeral_store(ephemeral_store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 87208a6d66..a80b692ed6 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -457,9 +457,7 @@ impl Default for TestConfig { macro_rules! setup_builder { ($builder:ident, $config:expr) => { - #[cfg(feature = "uniffi")] - let $builder = Builder::from_config($config.clone()); - #[cfg(not(feature = "uniffi"))] + #[allow(unused_mut)] let mut $builder = Builder::from_config($config.clone()); }; } From 92575ca9d0ee1f6b076207423861ddda324bde79 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 21:08:32 +0100 Subject: [PATCH 5/7] Expose tiered storage configuration across FFI Add UniFFI-facing store abstractions and builder APIs so foreign-language callers can configure a custom primary store, an ephemeral store, and a local SQLite backup storage path when constructing nodes. This introduces `FfiDynStoreTrait` as an FFI-safe equivalent of `DynStoreTrait`, along with a Rust-side adapter that bridges foreign store implementations into the internal dynamic store abstraction used by the builder. As part of this change, we: - add UniFFI bindings for custom primary and ephemeral stores, plus the backup storage directory - expose `Builder::set_backup_storage_dir_path`, `Builder::set_ephemeral_store`, and `Builder::build_with_store` on the FFI surface - route FFI-backed builder construction through the native dyn-store path - move FFI I/O-related types into a dedicated module - preserve per-key write ordering across the FFI boundary - route Rust-side synchronous access through the async mutation path so sync and async callers share the same ordering behavior --- bindings/ldk_node.udl | 28 ++ src/builder.rs | 47 ++- src/ffi/io.rs | 626 ++++++++++++++++++++++++++++++++ src/ffi/mod.rs | 4 + src/ffi/types.rs | 16 +- src/lib.rs | 12 +- src/types.rs | 24 +- tests/common/mod.rs | 7 + tests/integration_tests_rust.rs | 8 +- 9 files changed, 743 insertions(+), 29 deletions(-) create mode 100644 src/ffi/io.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index f87c7b2945..994c96567c 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -32,6 +32,28 @@ interface LogWriter { void log(LogRecord record); }; +[Trait, WithForeign] +interface FfiDynStoreTrait { + [Throws=IOError, Async] + bytes read_async(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError, Async] + void write_async(string primary_namespace, string secondary_namespace, string key, bytes buf); + [Throws=IOError, Async] + void remove_async(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError, Async] + sequence list_async(string primary_namespace, string secondary_namespace); + + [Throws=IOError] + bytes read(string primary_namespace, string secondary_namespace, string key); + [Throws=IOError] + void write(string primary_namespace, string secondary_namespace, string key, bytes buf); + [Throws=IOError] + void remove(string primary_namespace, string secondary_namespace, string key, boolean lazy); + [Throws=IOError] + sequence list(string primary_namespace, string secondary_namespace); +}; + + interface Builder { constructor(); [Name=from_config] @@ -58,6 +80,8 @@ interface Builder { void set_tor_config(TorConfig tor_config); [Throws=BuildError] void set_node_alias(string node_alias); + void set_backup_storage_dir_path(string backup_storage_dir_path); + void set_ephemeral_store(FfiDynStoreTrait ephemeral_store); [Throws=BuildError] void set_async_payments_role(AsyncPaymentsRole? role); void set_wallet_recovery_mode(); @@ -73,6 +97,8 @@ interface Builder { Node build_with_vss_store_and_fixed_headers(NodeEntropy node_entropy, string vss_url, string store_id, record fixed_headers); [Throws=BuildError] Node build_with_vss_store_and_header_provider(NodeEntropy node_entropy, string vss_url, string store_id, VssHeaderProvider header_provider); + [Throws=BuildError] + Node build_with_store(NodeEntropy node_entropy, FfiDynStoreTrait store); }; interface Node { @@ -231,6 +257,8 @@ enum NodeError { "InvalidLnurl", }; +typedef enum IOError; + typedef dictionary NodeStatus; typedef enum BuildError; diff --git a/src/builder.rs b/src/builder.rs index 8637ae334b..cbc6520a32 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -56,6 +56,8 @@ use crate::connection::ConnectionManager; use crate::entropy::NodeEntropy; use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; +#[cfg(feature = "uniffi")] +use crate::ffi::{FfiDynStore, FfiDynStoreTrait}; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::tier_store::TierStore; @@ -854,6 +856,7 @@ impl NodeBuilder { /// /// [`set_ephemeral_store`]: Self::set_ephemeral_store /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path + #[cfg(not(feature = "uniffi"))] pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { @@ -863,6 +866,21 @@ impl NodeBuilder { fn build_with_store_and_logger( &self, node_entropy: NodeEntropy, kv_store: S, logger: Arc, + ) -> Result { + let primary_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + self.build_with_dynstore_and_logger(node_entropy, primary_store, logger) + } + + #[cfg(feature = "uniffi")] + fn build_with_dynstore( + &self, node_entropy: NodeEntropy, primary_store: Arc, + ) -> Result { + let logger = setup_logger(&self.log_writer_config, &self.config)?; + self.build_with_dynstore_and_logger(node_entropy, primary_store, logger) + } + + fn build_with_dynstore_and_logger( + &self, node_entropy: NodeEntropy, primary_store: Arc, logger: Arc, ) -> Result { let runtime = if let Some(handle) = self.runtime_handle.as_ref() { Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger))) @@ -874,10 +892,11 @@ impl NodeBuilder { }; let ts_config = self.tier_store_config.as_ref(); - let primary_store = Arc::new(DynStoreWrapper(kv_store)); let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { let primary_storage_dir_path = PathBuf::from(&self.config.storage_dir_path); if primary_storage_dir_path == *backup_storage_dir_path { @@ -898,6 +917,7 @@ impl NodeBuilder { log_error!(logger, "Failed to setup backup SQLite store: {}", e); BuildError::KVStoreSetupFailed })?; + let backup_store: Arc = Arc::new(DynStoreWrapper(backup_store)); tier_store.set_backup_store(backup_store); } @@ -905,6 +925,7 @@ impl NodeBuilder { let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); + let kv_store: Arc = Arc::new(DynStoreWrapper(tier_store)); build_with_store_internal( config, @@ -917,7 +938,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(tier_store)), + kv_store, ) } } @@ -1240,8 +1261,9 @@ impl ArcedNodeBuilder { /// can be rebuilt if lost. /// /// If not set, non-critical data will be stored in the primary store. - pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { - self.inner.write().expect("lock").set_ephemeral_store(ephemeral_store); + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + let store: Arc = Arc::new(FfiDynStore::from_store(ephemeral_store)); + self.inner.write().expect("lock").set_ephemeral_store(store); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options @@ -1372,12 +1394,19 @@ impl ArcedNodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. - // Note that the generics here don't actually work for Uniffi, but we don't currently expose - // this so its not needed. - pub fn build_with_store( - &self, node_entropy: Arc, kv_store: S, + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a backup store for local disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_storage_dir_path`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path + pub fn build_with_store( + &self, node_entropy: Arc, kv_store: Arc, ) -> Result, BuildError> { - self.inner.read().expect("lock").build_with_store(*node_entropy, kv_store).map(Arc::new) + let store: Arc = Arc::new(FfiDynStore::from_store(kv_store)); + self.inner.read().expect("lock").build_with_dynstore(*node_entropy, store).map(Arc::new) } } diff --git a/src/ffi/io.rs b/src/ffi/io.rs new file mode 100644 index 0000000000..d1e30b3099 --- /dev/null +++ b/src/ffi/io.rs @@ -0,0 +1,626 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in + +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, OnceLock}; + +use crate::io::utils::check_namespace_key_validity; +use crate::types::{DynStoreTrait, DynStoreWrapper, SyncAndAsyncKVStore}; + +#[derive(Debug)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Error))] +pub enum IOError { + NotFound, + PermissionDenied, + ConnectionRefused, + ConnectionReset, + ConnectionAborted, + NotConnected, + AddrInUse, + AddrNotAvailable, + BrokenPipe, + AlreadyExists, + WouldBlock, + InvalidInput, + InvalidData, + TimedOut, + WriteZero, + Interrupted, + UnexpectedEof, + Other, +} + +impl From for IOError { + fn from(error: bitcoin::io::Error) -> Self { + match error.kind() { + bitcoin::io::ErrorKind::NotFound => IOError::NotFound, + bitcoin::io::ErrorKind::PermissionDenied => IOError::PermissionDenied, + bitcoin::io::ErrorKind::ConnectionRefused => IOError::ConnectionRefused, + bitcoin::io::ErrorKind::ConnectionReset => IOError::ConnectionReset, + bitcoin::io::ErrorKind::ConnectionAborted => IOError::ConnectionAborted, + bitcoin::io::ErrorKind::NotConnected => IOError::NotConnected, + bitcoin::io::ErrorKind::AddrInUse => IOError::AddrInUse, + bitcoin::io::ErrorKind::AddrNotAvailable => IOError::AddrNotAvailable, + bitcoin::io::ErrorKind::BrokenPipe => IOError::BrokenPipe, + bitcoin::io::ErrorKind::AlreadyExists => IOError::AlreadyExists, + bitcoin::io::ErrorKind::WouldBlock => IOError::WouldBlock, + bitcoin::io::ErrorKind::InvalidInput => IOError::InvalidInput, + bitcoin::io::ErrorKind::InvalidData => IOError::InvalidData, + bitcoin::io::ErrorKind::TimedOut => IOError::TimedOut, + bitcoin::io::ErrorKind::WriteZero => IOError::WriteZero, + bitcoin::io::ErrorKind::Interrupted => IOError::Interrupted, + bitcoin::io::ErrorKind::UnexpectedEof => IOError::UnexpectedEof, + bitcoin::io::ErrorKind::Other => IOError::Other, + } + } +} + +impl From for bitcoin::io::Error { + fn from(error: IOError) -> Self { + match error { + IOError::NotFound => bitcoin::io::ErrorKind::NotFound.into(), + IOError::PermissionDenied => bitcoin::io::ErrorKind::PermissionDenied.into(), + IOError::ConnectionRefused => bitcoin::io::ErrorKind::ConnectionRefused.into(), + IOError::ConnectionReset => bitcoin::io::ErrorKind::ConnectionReset.into(), + IOError::ConnectionAborted => bitcoin::io::ErrorKind::ConnectionAborted.into(), + IOError::NotConnected => bitcoin::io::ErrorKind::NotConnected.into(), + IOError::AddrInUse => bitcoin::io::ErrorKind::AddrInUse.into(), + IOError::AddrNotAvailable => bitcoin::io::ErrorKind::AddrNotAvailable.into(), + IOError::BrokenPipe => bitcoin::io::ErrorKind::BrokenPipe.into(), + IOError::AlreadyExists => bitcoin::io::ErrorKind::AlreadyExists.into(), + IOError::WouldBlock => bitcoin::io::ErrorKind::WouldBlock.into(), + IOError::InvalidInput => bitcoin::io::ErrorKind::InvalidInput.into(), + IOError::InvalidData => bitcoin::io::ErrorKind::InvalidData.into(), + IOError::TimedOut => bitcoin::io::ErrorKind::TimedOut.into(), + IOError::WriteZero => bitcoin::io::ErrorKind::WriteZero.into(), + IOError::Interrupted => bitcoin::io::ErrorKind::Interrupted.into(), + IOError::UnexpectedEof => bitcoin::io::ErrorKind::UnexpectedEof.into(), + IOError::Other => bitcoin::io::ErrorKind::Other.into(), + } + } +} + +impl std::fmt::Display for IOError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + IOError::NotFound => write!(f, "NotFound"), + IOError::PermissionDenied => write!(f, "PermissionDenied"), + IOError::ConnectionRefused => write!(f, "ConnectionRefused"), + IOError::ConnectionReset => write!(f, "ConnectionReset"), + IOError::ConnectionAborted => write!(f, "ConnectionAborted"), + IOError::NotConnected => write!(f, "NotConnected"), + IOError::AddrInUse => write!(f, "AddrInUse"), + IOError::AddrNotAvailable => write!(f, "AddrNotAvailable"), + IOError::BrokenPipe => write!(f, "BrokenPipe"), + IOError::AlreadyExists => write!(f, "AlreadyExists"), + IOError::WouldBlock => write!(f, "WouldBlock"), + IOError::InvalidInput => write!(f, "InvalidInput"), + IOError::InvalidData => write!(f, "InvalidData"), + IOError::TimedOut => write!(f, "TimedOut"), + IOError::WriteZero => write!(f, "WriteZero"), + IOError::Interrupted => write!(f, "Interrupted"), + IOError::UnexpectedEof => write!(f, "UnexpectedEof"), + IOError::Other => write!(f, "Other"), + } + } +} + +/// FFI-safe version of [`DynStoreTrait`]. +/// +/// Foreign implementations must provide both synchronous and asynchronous store +/// methods so they can be used across the Rust and language-binding surfaces. +/// +/// Internally, [`FfiDynStore`] treats the asynchronous write/remove path as the +/// canonical implementation for ordered mutations. Its synchronous methods are +/// bridged through that async path to preserve per-key write ordering guarantees +/// across both sync and async callers. +/// +/// As a result, Rust-side synchronous calls routed through [`FfiDynStore`] may +/// invoke the async methods of a foreign implementation rather than its sync +/// methods directly. +/// +/// Note: Foreign implementations are free to make either the sync or async methods +/// their primary implementation and have the other delegate to it. [`FfiDynStore`] +/// does not assume which side is primary, but its Rust-side sync bridge executes +/// through the async interface. +/// +/// [`DynStoreTrait`]: crate::types::DynStoreTrait +#[async_trait::async_trait] +pub trait FfiDynStoreTrait: Send + Sync { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError>; + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError>; + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError>; + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError>; +} + +/// Bridges a foreign [`FfiDynStoreTrait`] implementation into Rust's +/// [`DynStoreTrait`] while enforcing per-key write ordering. +/// +/// Ordered writes/removals are coordinated within [`FfiDynStore`] so the +/// underlying foreign store does not need to provide its own cross-call ordering +/// guarantees. +/// +/// The async mutation path is canonical. Sync operations are executed by running +/// the corresponding async operation on an internal runtime, which preserves the +/// same ordering guarantees for both sync and async callers. +#[derive(Clone, uniffi::Object)] +pub(crate) struct FfiDynStore { + inner: Arc, + next_write_version: Arc, +} + +#[uniffi::export] +impl FfiDynStore { + #[uniffi::constructor] + pub fn from_store(store: Arc) -> Self { + let inner = Arc::new(FfiDynStoreInner::new(store)); + + Self { inner, next_write_version: Arc::new(AtomicU64::new(1)) } + } +} + +impl FfiDynStore { + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } + } + + fn get_new_version_and_lock_ref( + &self, locking_key: String, + ) -> (Arc>, u64) { + let version = self.next_write_version.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FfiDynStore version counter overflowed"); + } + + let inner_lock_ref = self.inner.get_inner_lock_ref(locking_key); + + (inner_lock_ref, version) + } + + /// Runs a synchronous Rust-side store call via FfiDynStore's canonical async path. + /// + /// This keeps sync callers aligned with the same ordered mutation logic used by + /// async callers and avoids blocking directly on async coordination primitives. + /// + /// If already inside a Tokio runtime, the async operation is run from a + /// dedicated OS thread using a shared process-wide runtime. + fn run_sync_via_async(&self, fut: F) -> Result + where + T: Send + 'static, + F: Future> + Send + 'static, + { + let runtime = ffi_dynstore_runtime(); + + // If we are already inside a Tokio runtime, avoid blocking the current Tokio runtime + // thread directly and run the async operation on a dedicated OS thread. + if tokio::runtime::Handle::try_current().is_ok() { + std::thread::spawn(move || runtime.block_on(fut)).join().unwrap_or_else(|_| { + Err(bitcoin::io::Error::new( + bitcoin::io::ErrorKind::Other, + "FfiDynStore sync bridge thread panicked", + )) + }) + } else { + runtime.block_on(fut) + } + } +} + +impl DynStoreTrait for FfiDynStore { + fn read_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + Box::pin(async move { + store + .read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + }) + } + + fn write_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + Box::pin(async move { + store + .write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn remove_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + Box::pin(async move { + store + .remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn list_async( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, bitcoin::io::Error>> + Send + 'static>> { + let store = Arc::clone(&self.inner); + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + Box::pin(async move { + store + .list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + }) + } + + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + self.run_sync_via_async(async move { + store + .read_internal_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.run_sync_via_async(async move { + store + .write_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); + let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); + + self.run_sync_via_async(async move { + store + .remove_internal_async( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + lazy, + ) + .await + .map_err(|e| e.into()) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, bitcoin::io::Error> { + let store = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + self.run_sync_via_async(async move { + store + .list_internal_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + }) + } +} + +struct FfiDynStoreInner { + ffi_store: Arc, + write_version_locks: Mutex>>>, +} + +impl FfiDynStoreInner { + fn new(ffi_store: Arc) -> Self { + Self { ffi_store, write_version_locks: Mutex::new(HashMap::new()) } + } + + fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { + let mut outer_lock = self.write_version_locks.lock().expect("lock"); + Arc::clone(&outer_lock.entry(locking_key).or_default()) + } + + async fn read_internal_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; + self.ffi_store + .read_async(primary_namespace, secondary_namespace, key) + .await + .map_err(|e| e.into()) + } + + async fn write_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .write_async(primary_namespace, secondary_namespace, key, buf) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + async fn remove_internal_async( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> bitcoin::io::Result<()> { + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; + + let store = Arc::clone(&self.ffi_store); + + self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { + store + .remove_async(primary_namespace, secondary_namespace, key, lazy) + .await + .map_err(|e| >::into(e))?; + + Ok(()) + }) + .await + } + + async fn list_internal_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> bitcoin::io::Result> { + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; + self.ffi_store + .list_async(primary_namespace, secondary_namespace) + .await + .map_err(|e| e.into()) + } + + async fn execute_locked_write< + F: Future>, + FN: FnOnce() -> F, + >( + &self, inner_lock_ref: Arc>, locking_key: String, version: u64, + callback: FN, + ) -> Result<(), bitcoin::io::Error> { + let res = { + let mut last_written_version = inner_lock_ref.lock().await; + + // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual + // consistency. + let is_stale_version = version <= *last_written_version; + + // If the version is not stale, we execute the callback. Otherwise we can and must skip writing. + if is_stale_version { + Ok(()) + } else { + callback().await.map(|_| { + *last_written_version = version; + }) + } + }; + + self.clean_locks(&inner_lock_ref, locking_key); + + res + } + + fn clean_locks(&self, inner_lock_ref: &Arc>, locking_key: String) { + // If there no arcs in use elsewhere, this means that there are no in-flight writes. We can remove the map entry + // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in + // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already + // counted. + let mut outer_lock = self.write_version_locks.lock().expect("lock"); + + let strong_count = Arc::strong_count(&inner_lock_ref); + debug_assert!(strong_count >= 2, "Unexpected FfiDynStore strong count"); + + if strong_count == 2 { + outer_lock.remove(&locking_key); + } + } +} + +// We do this to allow native Rust stores to satisfy the UniFFI-facing store trait +// in tests and internal bridge code. This keeps `DynStoreWrapper` as the single +// adapter for Rust `KVStore`/`KVStoreSync` implementations while letting the UniFFI +// builder paths accept `Arc` without requiring a public +// `FfiDynStore` wrapper at call sites. +#[async_trait::async_trait] +impl FfiDynStoreTrait for DynStoreWrapper { + async fn read_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read_async(self, &primary_namespace, &secondary_namespace, &key) + .await + .map_err(IOError::from) + } + + async fn write_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write_async(self, &primary_namespace, &secondary_namespace, &key, buf) + .await + .map_err(IOError::from) + } + + async fn remove_async( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove_async(self, &primary_namespace, &secondary_namespace, &key, lazy) + .await + .map_err(IOError::from) + } + + async fn list_async( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list_async(self, &primary_namespace, &secondary_namespace) + .await + .map_err(IOError::from) + } + + fn read( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> Result, IOError> { + DynStoreTrait::read(self, &primary_namespace, &secondary_namespace, &key) + .map_err(IOError::from) + } + + fn write( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> Result<(), IOError> { + DynStoreTrait::write(self, &primary_namespace, &secondary_namespace, &key, buf) + .map_err(IOError::from) + } + + fn remove( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> Result<(), IOError> { + DynStoreTrait::remove(self, &primary_namespace, &secondary_namespace, &key, lazy) + .map_err(IOError::from) + } + + fn list( + &self, primary_namespace: String, secondary_namespace: String, + ) -> Result, IOError> { + DynStoreTrait::list(self, &primary_namespace, &secondary_namespace).map_err(IOError::from) + } +} + +/// Returns the process-wide runtime used to bridge Rust-side synchronous +/// `FfiDynStore` calls through the canonical async store path. +fn ffi_dynstore_runtime() -> &'static tokio::runtime::Runtime { + static RUNTIME: OnceLock = OnceLock::new(); + + RUNTIME.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .max_blocking_threads(2) + .build() + .expect("Failed to build FfiDynStore runtime") + }) +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 32464d0445..627667175e 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -4,6 +4,10 @@ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license , at your option. You may not use this file except in +#[cfg(feature = "uniffi")] +mod io; +#[cfg(feature = "uniffi")] +pub use io::*; #[cfg(feature = "uniffi")] mod types; diff --git a/src/ffi/types.rs b/src/ffi/types.rs index ad293bc3e0..aa7264a9e4 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -51,6 +51,14 @@ use vss_client::headers::{ VssHeaderProviderError as VssClientHeaderProviderError, }; +use crate::builder::sanitize_alias; +pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; +pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; +use crate::error::Error; +pub use crate::liquidity::LSPS1OrderStatus; +pub use crate::logger::{LogLevel, LogRecord, LogWriter}; +use crate::{hex_utils, SocketAddress, UserChannelId}; + /// Errors around providing headers for each VSS request. #[derive(Debug, uniffi::Error)] pub enum VssHeaderProviderError { @@ -142,14 +150,6 @@ impl VssClientHeaderProvider for VssHeaderProviderAdapter { } } -use crate::builder::sanitize_alias; -pub use crate::config::{default_config, ElectrumSyncConfig, EsploraSyncConfig, TorConfig}; -pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount}; -use crate::error::Error; -pub use crate::liquidity::LSPS1OrderStatus; -pub use crate::logger::{LogLevel, LogRecord, LogWriter}; -use crate::{hex_utils, SocketAddress, UserChannelId}; - uniffi::custom_type!(PublicKey, String, { remote, try_lift: |val| { diff --git a/src/lib.rs b/src/lib.rs index 6d877ae10d..aded689277 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -141,6 +141,8 @@ use event::{EventHandler, EventQueue}; use fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; #[cfg(feature = "uniffi")] use ffi::*; +#[cfg(feature = "uniffi")] +pub use ffi::{FfiDynStoreTrait, IOError}; use gossip::GossipSource; use graph::NetworkGraph; use io::utils::update_and_persist_node_metrics; @@ -173,11 +175,13 @@ use peer_store::{PeerInfo, PeerStore}; use runtime::Runtime; pub use tokio; use types::{ - Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, DynStore, Graph, - HRNResolver, KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, - Wallet, + Broadcaster, BumpTransactionEventHandler, ChainMonitor, ChannelManager, Graph, HRNResolver, + KeysManager, OnionMessenger, PaymentStore, PeerManager, Router, Scorer, Sweeper, Wallet, +}; +pub use types::{ + ChannelDetails, CustomTlvRecord, DynStore, DynStoreWrapper, PeerDetails, SyncAndAsyncKVStore, + UserChannelId, }; -pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; use crate::scoring::setup_background_pathfinding_scores_sync; diff --git a/src/types.rs b/src/types.rs index aec3967b15..c94cbe9469 100644 --- a/src/types.rs +++ b/src/types.rs @@ -57,29 +57,37 @@ where { } +/// Object-safe store trait supporting both async and sync operations. pub trait DynStoreTrait: Send + Sync { + /// Asynchronously reads the value for the given namespace and key. fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; + /// Asynchronously writes the value for the given namespace and key. fn write_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Pin> + Send + 'static>>; + /// Asynchronously removes the value for the given namespace and key. fn remove_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Pin> + Send + 'static>>; + /// Asynchronously lists keys under the given namespace. fn list_async( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; - + /// Synchronously reads the value for the given namespace and key. fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result, bitcoin::io::Error>; + /// Synchronously writes the value for the given namespace and key. fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> Result<(), bitcoin::io::Error>; + /// Synchronously removes the value for the given namespace and key. fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> Result<(), bitcoin::io::Error>; + /// Synchronously lists keys under the given namespace. fn list( &self, primary_namespace: &str, secondary_namespace: &str, ) -> Result, bitcoin::io::Error>; @@ -137,12 +145,13 @@ impl<'a> KVStoreSync for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +/// Type alias for any store that implements DynStoreTrait. +pub type DynStore = dyn DynStoreTrait; -// Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` -// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by -// returning `Pin>` instead, and this wrapper bridges the two by delegating -// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods. +/// Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` +/// methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by +/// returning `Pin>` instead, and this wrapper bridges the two by delegating +/// `KVStore` methods to the corresponding `DynStoreTrait::*_async` methods. #[derive(Clone)] pub(crate) struct DynStoreRef(pub(crate) Arc); @@ -172,7 +181,8 @@ impl KVStore for DynStoreRef { } } -pub(crate) struct DynStoreWrapper(pub(crate) T); +/// Wrapper that adapts a concrete `SyncAndAsyncKVStore` to `DynStoreTrait`. +pub struct DynStoreWrapper(pub T); impl DynStoreTrait for DynStoreWrapper { fn read_async( diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a80b692ed6..183df72844 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -43,6 +43,8 @@ use ldk_node::config::{ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; +#[cfg(feature = "uniffi")] +use ldk_node::DynStoreWrapper; use ldk_node::{ Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, PendingSweepBalance, UserChannelId, @@ -598,7 +600,12 @@ where let node = match config.store_type { TestStoreType::TestSyncStore => { + #[cfg(not(feature = "uniffi"))] let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); + #[cfg(feature = "uniffi")] + let kv_store = Arc::new(DynStoreWrapper(TestSyncStore::new( + config.node_config.storage_dir_path.into(), + ))); builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 1b1fc48257..7bd9e5b233 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -36,6 +36,8 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, UnifiedPaymentResult, }; +#[cfg(feature = "uniffi")] +use ldk_node::DynStoreWrapper; use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; @@ -313,8 +315,12 @@ async fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + #[cfg(not(feature = "uniffi"))] let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); - + #[cfg(feature = "uniffi")] + let test_sync_store = Arc::new(DynStoreWrapper(TestSyncStore::new( + config.node_config.storage_dir_path.clone().into(), + ))); let mut sync_config = EsploraSyncConfig::default(); sync_config.background_sync_config = None; setup_builder!(builder, config.node_config); From c9ebfec8ba8a02a29b5700f9fad2848563d089a0 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 22:02:45 +0100 Subject: [PATCH 6/7] Add Rust integration tests for tiered storage Extend the Rust test harness to support tiered store configurations and add integration coverage for routing data across primary, backup, and ephemeral stores. This introduces tier-store-aware test helpers, including support for configuring separate stores per node, opening the internally-created SQLite backup store for inspection, and reading test stores in both native and UniFFI-enabled builds. Add an integration test covering the tiered-storage channel lifecycle and verifying that: - durable node data is persisted to both the primary and backup stores - ephemeral-routed data is stored in the ephemeral tier - ephemeral data is not mirrored back into the durable tiers --- benches/payments.rs | 1 + tests/common/mod.rs | 148 ++++++++++++++++++++++--- tests/integration_tests_rust.rs | 187 +++++++++++++++++++++++++++----- 3 files changed, 297 insertions(+), 39 deletions(-) diff --git a/benches/payments.rs b/benches/payments.rs index 52769d7949..8ded1399e9 100644 --- a/benches/payments.rs +++ b/benches/payments.rs @@ -127,6 +127,7 @@ fn payment_benchmark(c: &mut Criterion) { true, false, common::TestStoreType::Sqlite, + common::TestStoreType::Sqlite, ); let runtime = diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 183df72844..e33fe55e00 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -43,7 +43,6 @@ use ldk_node::config::{ use ldk_node::entropy::{generate_entropy_mnemonic, NodeEntropy}; use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::payment::{PaymentDirection, PaymentKind, PaymentStatus}; -#[cfg(feature = "uniffi")] use ldk_node::DynStoreWrapper; use ldk_node::{ Builder, ChannelShutdownState, CustomTlvRecord, Event, LightningBalance, Node, NodeError, @@ -414,10 +413,19 @@ pub(crate) enum TestChainSource<'a> { BitcoindRestSync(&'a BitcoinD), } -#[derive(Clone, Copy)] +#[cfg(feature = "uniffi")] +use ldk_node::FfiDynStoreTrait; + +#[cfg(feature = "uniffi")] +type TestDynStore = Arc; +#[cfg(not(feature = "uniffi"))] +type TestDynStore = TestSyncStore; + +#[derive(Clone)] pub(crate) enum TestStoreType { TestSyncStore, Sqlite, + TierStore { primary: TestDynStore, ephemeral: Option }, } impl Default for TestStoreType { @@ -434,6 +442,7 @@ pub(crate) struct TestConfig { pub node_entropy: NodeEntropy, pub async_payments_role: Option, pub recovery_mode: bool, + pub backup_storage_dir_path: Option, } impl Default for TestConfig { @@ -446,6 +455,7 @@ impl Default for TestConfig { let node_entropy = NodeEntropy::from_bip39_mnemonic(mnemonic, None); let async_payments_role = None; let recovery_mode = false; + let backup_storage_dir_path = None; TestConfig { node_config, log_writer, @@ -453,6 +463,7 @@ impl Default for TestConfig { node_entropy, async_payments_role, recovery_mode, + backup_storage_dir_path, } } } @@ -469,6 +480,35 @@ pub(crate) use setup_builder; #[cfg(any(cln_test, lnd_test, eclair_test))] pub(crate) mod scenarios; +// Helper function to create primary and ephemeral tier stores. +pub(crate) fn create_primary_and_ephemeral_stores( + base_path: PathBuf, +) -> (TestDynStore, TestDynStore) { + let primary = TestSyncStore::new(base_path.join("primary")); + let ephemeral = TestSyncStore::new(base_path.join("ephemeral")); + + #[cfg(feature = "uniffi")] + { + ( + Arc::new(DynStoreWrapper(primary)) as Arc, + Arc::new(DynStoreWrapper(ephemeral)) as Arc, + ) + } + #[cfg(not(feature = "uniffi"))] + { + (primary, ephemeral) + } +} + +pub(crate) fn open_test_backup_store(path: PathBuf) -> SqliteStore { + SqliteStore::new( + path, + Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap() +} + pub(crate) fn setup_two_nodes( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, @@ -479,16 +519,15 @@ pub(crate) fn setup_two_nodes( anchor_channels, anchors_trusted_no_reserve, TestStoreType::TestSyncStore, + TestStoreType::TestSyncStore, ) } -pub(crate) fn setup_two_nodes_with_store( +pub(crate) fn setup_two_nodes_with_config( chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, - anchors_trusted_no_reserve: bool, store_type: TestStoreType, + anchors_trusted_no_reserve: bool, mut config_a: TestConfig, mut config_b: TestConfig, ) -> (TestNode, TestNode) { println!("== Node A =="); - let mut config_a = random_config(anchor_channels); - config_a.store_type = store_type; if cfg!(hrn_tests) { config_a.node_config.hrn_config = @@ -498,8 +537,6 @@ pub(crate) fn setup_two_nodes_with_store( let node_a = setup_node(chain_source, config_a); println!("\n== Node B =="); - let mut config_b = random_config(anchor_channels); - config_b.store_type = store_type; if cfg!(hrn_tests) { config_b.node_config.hrn_config = HumanReadableNamesConfig { @@ -522,10 +559,31 @@ pub(crate) fn setup_two_nodes_with_store( .trusted_peers_no_reserve .push(node_a.node_id()); } + let node_b = setup_node(chain_source, config_b); (node_a, node_b) } +pub(crate) fn setup_two_nodes_with_store( + chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, + anchors_trusted_no_reserve: bool, store_type_a: TestStoreType, store_type_b: TestStoreType, +) -> (TestNode, TestNode) { + let mut config_a = random_config(anchor_channels); + config_a.store_type = store_type_a; + + let mut config_b = random_config(anchor_channels); + config_b.store_type = store_type_b; + + setup_two_nodes_with_config( + chain_source, + allow_0conf, + anchor_channels, + anchors_trusted_no_reserve, + config_a, + config_b, + ) +} + pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { setup_node_with_builder(chain_source, config, |_| {}) } @@ -600,15 +658,46 @@ where let node = match config.store_type { TestStoreType::TestSyncStore => { - #[cfg(not(feature = "uniffi"))] let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); + #[cfg(feature = "uniffi")] - let kv_store = Arc::new(DynStoreWrapper(TestSyncStore::new( - config.node_config.storage_dir_path.into(), - ))); - builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + { + let kv_store: Arc = Arc::new(DynStoreWrapper(kv_store)); + builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + } + + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + } }, TestStoreType::Sqlite => builder.build(config.node_entropy.into()).unwrap(), + TestStoreType::TierStore { primary, ephemeral } => { + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path { + builder.set_backup_storage_dir_path(backup_storage_dir_path); + } + + if let Some(ephemeral) = ephemeral { + #[cfg(feature = "uniffi")] + { + builder.set_ephemeral_store(ephemeral); + } + #[cfg(not(feature = "uniffi"))] + { + use ldk_node::DynStore; + let store: Arc = Arc::new(DynStoreWrapper(ephemeral)); + builder.set_ephemeral_store(store); + } + } + #[cfg(feature = "uniffi")] + { + builder.build_with_store(config.node_entropy.into(), primary).unwrap() + } + #[cfg(not(feature = "uniffi"))] + { + builder.build_with_store(config.node_entropy, primary).unwrap() + } + }, }; node.start().unwrap(); @@ -1931,3 +2020,36 @@ impl TestSyncStoreInner { self.do_list(primary_namespace, secondary_namespace) } } + +pub fn test_kv_read( + store: &TestDynStore, primary_ns: &str, secondary_ns: &str, key: &str, +) -> Result, bitcoin::io::Error> { + #[cfg(feature = "uniffi")] + { + ldk_node::FfiDynStoreTrait::read( + &**store, + primary_ns.to_string(), + secondary_ns.to_string(), + key.to_string(), + ) + .map_err(Into::into) + } + #[cfg(not(feature = "uniffi"))] + { + KVStoreSync::read(store, primary_ns, secondary_ns, key) + } +} + +pub fn test_kv_list( + store: &TestDynStore, primary_ns: &str, secondary_ns: &str, +) -> Result, bitcoin::io::Error> { + #[cfg(feature = "uniffi")] + { + ldk_node::FfiDynStoreTrait::list(&**store, primary_ns.to_string(), secondary_ns.to_string()) + .map_err(Into::into) + } + #[cfg(not(feature = "uniffi"))] + { + KVStoreSync::list(store, primary_ns, secondary_ns) + } +} diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 7bd9e5b233..bd77c5d413 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -17,14 +17,16 @@ use bitcoin::hashes::Hash; use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ - bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, - expect_channel_pending_event, expect_channel_ready_event, expect_channel_ready_events, - expect_event, expect_payment_claimable_event, expect_payment_received_event, - expect_payment_successful_event, expect_splice_pending_event, generate_blocks_and_wait, - generate_listening_addresses, open_channel, open_channel_push_amt, open_channel_with_all, - premine_and_distribute_funds, premine_blocks, prepare_rbf, random_chain_source, random_config, - setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, splice_in_with_all, - wait_for_tx, TestChainSource, TestStoreType, TestSyncStore, + bump_fee_and_broadcast, create_primary_and_ephemeral_stores, distribute_funds_unconfirmed, + do_channel_full_cycle, expect_channel_pending_event, expect_channel_ready_event, + expect_channel_ready_events, expect_event, expect_payment_claimable_event, + expect_payment_received_event, expect_payment_successful_event, expect_splice_pending_event, + generate_blocks_and_wait, generate_listening_addresses, open_channel, open_channel_push_amt, + open_channel_with_all, open_test_backup_store, premine_and_distribute_funds, premine_blocks, + prepare_rbf, random_chain_source, random_config, random_storage_path, + setup_bitcoind_and_electrsd, setup_builder, setup_node, setup_two_nodes, + setup_two_nodes_with_config, splice_in_with_all, test_kv_list, test_kv_read, wait_for_tx, + TestChainSource, TestStoreType, TestSyncStore, }; use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; @@ -36,13 +38,17 @@ use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, UnifiedPaymentResult, }; -#[cfg(feature = "uniffi")] -use ldk_node::DynStoreWrapper; use ldk_node::{Builder, Event, NodeError}; +#[cfg(feature = "uniffi")] +use ldk_node::{DynStoreWrapper, FfiDynStoreTrait}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; -use lightning::util::persist::KVStoreSync; +use lightning::util::persist::{ + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, +}; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -65,6 +71,126 @@ async fn channel_full_cycle() { .await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_tier_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + let (primary_a, ephemeral_a) = create_primary_and_ephemeral_stores(random_storage_path()); + let (primary_b, ephemeral_b) = create_primary_and_ephemeral_stores(random_storage_path()); + let backup_storage_dir_path_a = random_storage_path(); + let backup_storage_dir_path_b = random_storage_path(); + + let mut config_a = random_config(true); + config_a.backup_storage_dir_path = Some(backup_storage_dir_path_a.to_str().unwrap().to_owned()); + config_a.store_type = TestStoreType::TierStore { + primary: primary_a.clone(), + ephemeral: Some(ephemeral_a.clone()), + }; + + let mut config_b = random_config(true); + config_b.backup_storage_dir_path = Some(backup_storage_dir_path_b.to_str().unwrap().to_owned()); + config_b.store_type = + TestStoreType::TierStore { primary: primary_b, ephemeral: Some(ephemeral_b) }; + + let (node_a, node_b) = + setup_two_nodes_with_config(&chain_source, false, true, false, config_a, config_b); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + true, + false, + ) + .await; + + // Verify primary and backup both contain the same channel manager data. + let primary_channel_manager = test_kv_read( + &(primary_a.clone()), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .expect("Primary should have channel manager data"); + + let backup_a = open_test_backup_store(backup_storage_dir_path_a); + let backup_channel_manager = KVStoreSync::read( + &backup_a, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .expect("Backup should have channel manager data"); + assert_eq!( + primary_channel_manager, backup_channel_manager, + "Primary and backup should store identical channel manager data" + ); + + // Verify payment data is persisted in both primary and backup stores. + let mut primary_payments = + test_kv_list(&(primary_a.clone()), "payments", "").expect("Primary should list payments"); + assert!( + !primary_payments.is_empty(), + "Primary should have payment entries after the full cycle" + ); + + let mut backup_payments = + KVStoreSync::list(&backup_a, "payments", "").expect("Backup should list payments"); + assert!(!backup_payments.is_empty(), "Backup should have payment entries after the full cycle"); + + primary_payments.sort(); + backup_payments.sort(); + + assert_eq!( + primary_payments, backup_payments, + "Primary and backup should list identical payment entries" + ); + + // Verify ephemeral store does not contain primary-backed critical data. + let ephemeral_channel_manager = test_kv_read( + &(ephemeral_a.clone()), + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + assert!(ephemeral_channel_manager.is_err(), "Ephemeral should not have channel manager data"); + + let ephemeral_payments = test_kv_list(&(ephemeral_a.clone()), "payments", ""); + assert!( + ephemeral_payments.is_err() || ephemeral_payments.unwrap().is_empty(), + "Ephemeral should not have payment data" + ); + + // Verify ephemeral-routed data is stored in the ephemeral store. + let ephemeral_network_graph = test_kv_read( + &(ephemeral_a.clone()), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(ephemeral_network_graph.is_ok(), "Ephemeral should have network graph data"); + + // Verify ephemeral-routed data is not mirrored to primary or backup stores. + let primary_network_graph = test_kv_read( + &(primary_a.clone()), + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(primary_network_graph.is_err(), "Primary should not have ephemeral network graph data"); + + let backup_network_graph = KVStoreSync::read( + &backup_a, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + assert!(backup_network_graph.is_err(), "Backup should not have ephemeral network graph data"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -315,19 +441,23 @@ async fn start_stop_reinit() { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - #[cfg(not(feature = "uniffi"))] let test_sync_store = TestSyncStore::new(config.node_config.storage_dir_path.clone().into()); - #[cfg(feature = "uniffi")] - let test_sync_store = Arc::new(DynStoreWrapper(TestSyncStore::new( - config.node_config.storage_dir_path.clone().into(), - ))); + let mut sync_config = EsploraSyncConfig::default(); sync_config.background_sync_config = None; setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let node = - builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); + #[cfg(not(feature = "uniffi"))] + let node = builder.build_with_store(config.node_entropy.into(), test_sync_store.clone()).unwrap(); + + #[cfg(feature = "uniffi")] + let node = { + let kv_store: Arc = + Arc::new(DynStoreWrapper(test_sync_store.clone())); + builder.build_with_store(config.node_entropy.into(), kv_store).unwrap() + }; + node.start().unwrap(); let expected_node_id = node.node_id(); @@ -365,8 +495,18 @@ async fn start_stop_reinit() { setup_builder!(builder, config.node_config); builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - let reinitialized_node = - builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + let reinitialized_node; + #[cfg(feature = "uniffi")] + { + let test_sync_store: Arc = Arc::new(DynStoreWrapper(test_sync_store)); + reinitialized_node = + builder.build_with_store(config.node_entropy.into(), test_sync_store).unwrap(); + } + #[cfg(not(feature = "uniffi"))] + { + reinitialized_node = + builder.build_with_store(config.node_entropy, test_sync_store).unwrap(); + } reinitialized_node.start().unwrap(); assert_eq!(reinitialized_node.node_id(), expected_node_id); @@ -3001,12 +3141,7 @@ async fn builder_configures_sqlite_backup_store() { ) .unwrap(); - let backup_store = SqliteStore::new( - backup_dir, - Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), - Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), - ) - .unwrap(); + let backup_store = open_test_backup_store(backup_dir.into()); for (pn, sn, key) in [ ("bdk_wallet", "", "descriptor"), From 61484d10614d2838ff997d730d19eb471ad53d24 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 22:31:30 +0100 Subject: [PATCH 7/7] Add Python coverage for FFI tier-store builders Add a Python in-memory KV store implementation that satisfies the UniFFI store callback interface, including both sync and async accessors. Refactor the Python integration test setup so channel lifecycle tests can share node construction, cleanup, funding, channel open, payment, and close logic. Add a tier-store test that exercises the FFI builder path with Python primary and ephemeral stores while configuring the backup through the builder's SQLite backup path. The test verifies that Python-backed stores remain usable throughout a full channel lifecycle and that ephemeral data is routed to the ephemeral store. --- bindings/python/src/ldk_node/kv_store.py | 110 ++++++ bindings/python/src/ldk_node/test_ldk_node.py | 327 +++++++++++++----- 2 files changed, 344 insertions(+), 93 deletions(-) create mode 100644 bindings/python/src/ldk_node/kv_store.py diff --git a/bindings/python/src/ldk_node/kv_store.py b/bindings/python/src/ldk_node/kv_store.py new file mode 100644 index 0000000000..495b427b19 --- /dev/null +++ b/bindings/python/src/ldk_node/kv_store.py @@ -0,0 +1,110 @@ +import threading + +from abc import ABC, abstractmethod +from typing import List + +from ldk_node import IoError + +class AbstractKvStore(ABC): + @abstractmethod + async def read_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "List[int]": + pass + + @abstractmethod + async def write_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "List[int]") -> None: + pass + + @abstractmethod + async def remove_async(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + async def list_async(self, primary_namespace: "str",secondary_namespace: "str") -> "List[str]": + pass + + @abstractmethod + def read(self, primary_namespace: "str",secondary_namespace: "str",key: "str") -> "List[int]": + pass + + @abstractmethod + def write(self, primary_namespace: "str",secondary_namespace: "str",key: "str",buf: "List[int]") -> None: + pass + + @abstractmethod + def remove(self, primary_namespace: "str",secondary_namespace: "str",key: "str",lazy: "bool") -> None: + pass + + @abstractmethod + def list(self, primary_namespace: "str",secondary_namespace: "str") -> "List[str]": + pass + +class TestKvStore(AbstractKvStore): + def __init__(self, name: str): + self.name = name + # Storage structure: {(primary_ns, secondary_ns): {key: [bytes]}} + self.storage = {} + self._lock = threading.Lock() + + def dump(self): + print(f"\n[{self.name}] Store contents:") + for (primary_ns, secondary_ns), keys_dict in self.storage.items(): + print(f" Namespace: ({primary_ns!r}, {secondary_ns!r})") + for key, data in keys_dict.items(): + print(f" Key: {key!r} -> {len(data)} bytes") + # Optionally show first few bytes + preview = data[:20] if len(data) > 20 else data + print(f" Data preview: {preview}...") + + def read(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + + if namespace_key not in self.storage: + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + raise IoError.NotFound() + + return list(self.storage[namespace_key][key]) + + def write(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + self.storage[namespace_key] = {} + + self.storage[namespace_key][key] = list(buf) + + def remove(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key not in self.storage: + raise IoError.NotFound() + + if key not in self.storage[namespace_key]: + raise IoError.NotFound() + + del self.storage[namespace_key][key] + + if not self.storage[namespace_key]: + del self.storage[namespace_key] + + def list(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + with self._lock: + namespace_key = (primary_namespace, secondary_namespace) + if namespace_key in self.storage: + return sorted(self.storage[namespace_key].keys()) + return [] + + async def read_async(self, primary_namespace: str, secondary_namespace: str, key: str) -> List[int]: + return self.read(primary_namespace, secondary_namespace, key) + + async def write_async(self, primary_namespace: str, secondary_namespace: str, key: str, buf: List[int]) -> None: + self.write(primary_namespace, secondary_namespace, key, buf) + + async def remove_async(self, primary_namespace: str, secondary_namespace: str, key: str, lazy: bool) -> None: + self.remove(primary_namespace, secondary_namespace, key, lazy) + + async def list_async(self, primary_namespace: str, secondary_namespace: str) -> List[str]: + return self.list(primary_namespace, secondary_namespace) + \ No newline at end of file diff --git a/bindings/python/src/ldk_node/test_ldk_node.py b/bindings/python/src/ldk_node/test_ldk_node.py index 4f53dbabfc..edc82b1212 100644 --- a/bindings/python/src/ldk_node/test_ldk_node.py +++ b/bindings/python/src/ldk_node/test_ldk_node.py @@ -5,13 +5,33 @@ import os import re import requests +import asyncio +import threading +import ldk_node from ldk_node import * +from kv_store import TestKvStore DEFAULT_ESPLORA_SERVER_URL = "http://127.0.0.1:3002" DEFAULT_TEST_NETWORK = Network.REGTEST DEFAULT_BITCOIN_CLI_BIN = "bitcoin-cli" +class NodeSetup: + def __init__(self, node, node_id, tmp_dir, listening_addresses, stores=None, backup_tmp_dir=None): + self.node = node + self.node_id = node_id + self.tmp_dir = tmp_dir + self.listening_addresses = listening_addresses + self.stores = stores # (primary, ephemeral) or None + self.backup_tmp_dir = backup_tmp_dir + + def cleanup(self): + self.node.stop() + time.sleep(1) + self.tmp_dir.cleanup() + if self.backup_tmp_dir is not None: + self.backup_tmp_dir.cleanup() + def bitcoin_cli(cmd): args = [] @@ -95,7 +115,6 @@ def send_to_address(address, amount_sats): print("SEND TX:", res) return res - def setup_node(tmp_dir, esplora_endpoint, listening_addresses): mnemonic = generate_entropy_mnemonic(None) node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) @@ -107,134 +126,256 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses): builder.set_listening_addresses(listening_addresses) return builder.build(node_entropy) -def get_esplora_endpoint(): - if os.environ.get('ESPLORA_ENDPOINT'): - return str(os.environ['ESPLORA_ENDPOINT']) - return DEFAULT_ESPLORA_SERVER_URL +def setup_two_nodes(esplora_endpoint, port_1=2323, port_2=2324, use_tier_store=False) -> tuple[NodeSetup, NodeSetup]: + # Setup Node 1 + tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") + print("TMP DIR 1:", tmp_dir_1.name) + + listening_addresses_1 = [f"127.0.0.1:{port_1}"] + if use_tier_store: + node_1, stores_1, backup_tmp_dir_1 = setup_node_with_tier_store( + tmp_dir_1.name, + esplora_endpoint, + listening_addresses_1, + ) + else: + node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) + stores_1 = None + backup_tmp_dir_1 = None + + node_1.start() + node_id_1 = node_1.node_id() + print("Node ID 1:", node_id_1) + + setup_1 = NodeSetup( + node_1, + node_id_1, + tmp_dir_1, + listening_addresses_1, + stores_1, + backup_tmp_dir_1, + ) + + # Setup Node 2 + tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") + print("TMP DIR 2:", tmp_dir_2.name) + + listening_addresses_2 = [f"127.0.0.1:{port_2}"] + if use_tier_store: + node_2, stores_2, backup_tmp_dir_2 = setup_node_with_tier_store( + tmp_dir_2.name, + esplora_endpoint, + listening_addresses_2, + ) + else: + node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) + stores_2 = None + backup_tmp_dir_2 = None + + node_2.start() + node_id_2 = node_2.node_id() + print("Node ID 2:", node_id_2) + + setup_2 = NodeSetup( + node_2, + node_id_2, + tmp_dir_2, + listening_addresses_2, + stores_2, + backup_tmp_dir_2, + ) + + return setup_1, setup_2 +def setup_node_with_tier_store(tmp_dir, esplora_endpoint, listening_addresses) -> tuple[Node, tuple[TestKvStore, TestKvStore], tempfile.TemporaryDirectory]: + mnemonic = generate_entropy_mnemonic(None) + node_entropy = NodeEntropy.from_bip39_mnemonic(mnemonic, None) + config = default_config() -def expect_event(node, expected_event_type): - event = node.wait_next_event() - assert isinstance(event, expected_event_type) - print("EVENT:", event) - node.event_handled() - return event + primary = TestKvStore("primary") + ephemeral = TestKvStore("ephemeral") + backup_tmp_dir = tempfile.TemporaryDirectory("_ldk_node_backup") + builder = Builder.from_config(config) + builder.set_storage_dir_path(tmp_dir) + builder.set_chain_source_esplora(esplora_endpoint, None) + builder.set_network(DEFAULT_TEST_NETWORK) + builder.set_listening_addresses(listening_addresses) + builder.set_backup_storage_dir_path(backup_tmp_dir.name) + builder.set_ephemeral_store(ephemeral) + + return builder.build_with_store(node_entropy, primary), (primary, ephemeral), backup_tmp_dir +def do_channel_full_cycle(setup_1: NodeSetup, setup_2: NodeSetup, esplora_endpoint): + # Fund both nodes + (node_1, node_2) = (setup_1.node, setup_2.node) + address_1 = node_1.onchain_payment().new_address() + txid_1 = send_to_address(address_1, 100000) + address_2 = node_2.onchain_payment().new_address() + txid_2 = send_to_address(address_2, 100000) -class TestLdkNode(unittest.TestCase): - def setUp(self): - bitcoin_cli("createwallet ldk_node_test") - mine(101) - time.sleep(3) - esplora_endpoint = get_esplora_endpoint() - mine_and_wait(esplora_endpoint, 1) + wait_for_tx(esplora_endpoint, txid_1) + wait_for_tx(esplora_endpoint, txid_2) - def test_channel_full_cycle(self): - esplora_endpoint = get_esplora_endpoint() + mine_and_wait(esplora_endpoint, 6) - ## Setup Node 1 - tmp_dir_1 = tempfile.TemporaryDirectory("_ldk_node_1") - print("TMP DIR 1:", tmp_dir_1.name) + node_1.sync_wallets() + node_2.sync_wallets() - listening_addresses_1 = ["127.0.0.1:2323"] - node_1 = setup_node(tmp_dir_1.name, esplora_endpoint, listening_addresses_1) - node_1.start() - node_id_1 = node_1.node_id() - print("Node ID 1:", node_id_1) + spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats + spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats + total_balance_1 = node_1.list_balances().total_onchain_balance_sats + total_balance_2 = node_2.list_balances().total_onchain_balance_sats - # Setup Node 2 - tmp_dir_2 = tempfile.TemporaryDirectory("_ldk_node_2") - print("TMP DIR 2:", tmp_dir_2.name) + print("SPENDABLE 1:", spendable_balance_1) + assert spendable_balance_1 == 100000 - listening_addresses_2 = ["127.0.0.1:2324"] - node_2 = setup_node(tmp_dir_2.name, esplora_endpoint, listening_addresses_2) - node_2.start() - node_id_2 = node_2.node_id() - print("Node ID 2:", node_id_2) + print("SPENDABLE 2:", spendable_balance_2) + assert spendable_balance_2 == 100000 - address_1 = node_1.onchain_payment().new_address() - txid_1 = send_to_address(address_1, 100000) - address_2 = node_2.onchain_payment().new_address() - txid_2 = send_to_address(address_2, 100000) + print("TOTAL 1:", total_balance_1) + assert total_balance_1 == 100000 - wait_for_tx(esplora_endpoint, txid_1) - wait_for_tx(esplora_endpoint, txid_2) + print("TOTAL 2:", total_balance_2) + assert total_balance_2 == 100000 - mine_and_wait(esplora_endpoint, 6) + (node_id_2, listening_addresses_2) = (setup_2.node_id, setup_2.listening_addresses) + node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) - node_1.sync_wallets() - node_2.sync_wallets() + channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) + channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) + funding_txid = channel_pending_event_1.funding_txo.txid + wait_for_tx(esplora_endpoint, funding_txid) + mine_and_wait(esplora_endpoint, 6) - spendable_balance_1 = node_1.list_balances().spendable_onchain_balance_sats - spendable_balance_2 = node_2.list_balances().spendable_onchain_balance_sats - total_balance_1 = node_1.list_balances().total_onchain_balance_sats - total_balance_2 = node_2.list_balances().total_onchain_balance_sats + node_1.sync_wallets() + node_2.sync_wallets() - print("SPENDABLE 1:", spendable_balance_1) - self.assertEqual(spendable_balance_1, 100000) + channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) + print("funding_txo:", funding_txid) - print("SPENDABLE 2:", spendable_balance_2) - self.assertEqual(spendable_balance_2, 100000) + channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) - print("TOTAL 1:", total_balance_1) - self.assertEqual(total_balance_1, 100000) + description = Bolt11InvoiceDescription.DIRECT("asdf") + invoice = node_2.bolt11_payment().receive(2500000, description, 9217) + node_1.bolt11_payment().send(invoice, None) - print("TOTAL 2:", total_balance_2) - self.assertEqual(total_balance_2, 100000) + expect_event(node_1, Event.PAYMENT_SUCCESSFUL) - node_1.open_channel(node_id_2, listening_addresses_2[0], 50000, None, None) + expect_event(node_2, Event.PAYMENT_RECEIVED) + node_id_1 = setup_1.node_id + time.sleep(1) + node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) - channel_pending_event_1 = expect_event(node_1, Event.CHANNEL_PENDING) - channel_pending_event_2 = expect_event(node_2, Event.CHANNEL_PENDING) - funding_txid = channel_pending_event_1.funding_txo.txid - wait_for_tx(esplora_endpoint, funding_txid) - mine_and_wait(esplora_endpoint, 6) + # expect channel closed event on both nodes + expect_event(node_1, Event.CHANNEL_CLOSED) - node_1.sync_wallets() - node_2.sync_wallets() + expect_event(node_2, Event.CHANNEL_CLOSED) - channel_ready_event_1 = expect_event(node_1, Event.CHANNEL_READY) - print("funding_txo:", funding_txid) + mine_and_wait(esplora_endpoint, 1) - channel_ready_event_2 = expect_event(node_2, Event.CHANNEL_READY) + node_1.sync_wallets() + node_2.sync_wallets() - description = Bolt11InvoiceDescription.DIRECT("asdf") - invoice = node_2.bolt11_payment().receive(2500000, description, 9217) - node_1.bolt11_payment().send(invoice, None) - - expect_event(node_1, Event.PAYMENT_SUCCESSFUL) + spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_1 > 95000 + assert spendable_balance_after_close_1 < 100000 + spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats + assert spendable_balance_after_close_2 == 102500 + +def get_esplora_endpoint(): + if os.environ.get('ESPLORA_ENDPOINT'): + return str(os.environ['ESPLORA_ENDPOINT']) + return DEFAULT_ESPLORA_SERVER_URL - expect_event(node_2, Event.PAYMENT_RECEIVED) - - node_2.close_channel(channel_ready_event_2.user_channel_id, node_id_1) +def expect_event(node, expected_event_type): + event = node.wait_next_event() + assert isinstance(event, expected_event_type) + print("EVENT:", event) + node.event_handled() + return event - # expect channel closed event on both nodes - expect_event(node_1, Event.CHANNEL_CLOSED) +def wait_until(predicate, timeout=10.0, interval=0.1, message="condition not met before timeout"): + deadline = time.time() + timeout + while time.time() < deadline: + if predicate(): + return + time.sleep(interval) + raise AssertionError(message) - expect_event(node_2, Event.CHANNEL_CLOSED) +class TestLdkNode(unittest.TestCase): + def setUp(self): + bitcoin_cli("createwallet ldk_node_test") + mine(101) + time.sleep(3) + esplora_endpoint = get_esplora_endpoint() mine_and_wait(esplora_endpoint, 1) - node_1.sync_wallets() - node_2.sync_wallets() + def test_channel_full_cycle(self): + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes(esplora_endpoint) + + try: + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + finally: + setup_1.cleanup() + setup_2.cleanup() + - spendable_balance_after_close_1 = node_1.list_balances().spendable_onchain_balance_sats - assert spendable_balance_after_close_1 > 95000 - assert spendable_balance_after_close_1 < 100000 - spendable_balance_after_close_2 = node_2.list_balances().spendable_onchain_balance_sats - self.assertEqual(spendable_balance_after_close_2, 102500) + def test_tier_store(self): + # Set an event loop for async Python callbacks invoked from Rust. + # (https://mozilla.github.io/uniffi-rs/0.27/futures.html#python-uniffi_set_event_loop) + loop = asyncio.new_event_loop() - # Stop nodes - node_1.stop() - node_2.stop() + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() - # Cleanup - time.sleep(1) # Wait a sec so our logs can finish writing - tmp_dir_1.cleanup() - tmp_dir_2.cleanup() + loop_thread = threading.Thread(target=run_loop, daemon=True) + loop_thread.start() + ldk_node.uniffi_set_event_loop(loop) + + esplora_endpoint = get_esplora_endpoint() + setup_1, setup_2 = setup_two_nodes( + esplora_endpoint, + port_1=2325, + port_2=2326, + use_tier_store=True, + ) + + try: + do_channel_full_cycle(setup_1, setup_2, esplora_endpoint) + + primary, ephemeral = setup_1.stores + + wait_until( + lambda: len(primary.storage) > 0, + message="primary store did not receive any data", + ) + + self.assertGreater(len(ephemeral.storage), 0, "ephemeral should have data") + + ephemeral_keys = [ + key + for namespace in ephemeral.storage.values() + for key in namespace.keys() + ] + has_scorer_or_graph = any( + key in ["scorer", "network_graph"] for key in ephemeral_keys + ) + self.assertTrue( + has_scorer_or_graph, + "ephemeral should contain scorer or network_graph data", + ) + finally: + setup_1.cleanup() + setup_2.cleanup() + loop.call_soon_threadsafe(loop.stop) + loop_thread.join(timeout=5) + loop.close() if __name__ == '__main__': unittest.main()