Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ bitcoin_hashes = { version = "0.14.0", default-features = false }
bitcoin-io = { version = "0.1.2", default-features = false }
lightning = { version = "0.3.0", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }
lightning-liquidity = { version = "0.3.0", path = "../lightning-liquidity", default-features = false }
possiblyrandom = { version = "0.2", path = "../possiblyrandom", default-features = false }

[dev-dependencies]
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
lightning = { version = "0.3.0", path = "../lightning", features = ["_test_utils"] }
lightning-invoice = { version = "0.34.0", path = "../lightning-invoice" }
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false, features = ["_test_utils"] }
lightning-liquidity = { version = "0.3.0", path = "../lightning-liquidity", default-features = false, features = ["_test_utils"] }
lightning-persister = { version = "0.2.0", path = "../lightning-persister" }

[lints]
Expand Down
155 changes: 51 additions & 104 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,9 @@ pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
U: Deref,
L: Deref,
> where
U::Target: UtxoLookup,
L::Target: Logger,
{
U: UtxoLookup,
L: Logger,
> {
/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
P2P(P),
/// Rapid gossip sync from a trusted server.
Expand All @@ -218,12 +215,9 @@ impl<
P: Deref<Target = P2PGossipSync<G, U, L>>,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
U: Deref,
L: Deref,
U: UtxoLookup,
L: Logger,
> GossipSync<P, R, G, U, L>
where
U::Target: UtxoLookup,
L::Target: Logger,
{
fn network_graph(&self) -> Option<&G> {
match self {
Expand Down Expand Up @@ -260,12 +254,9 @@ where
impl<
P: Deref<Target = P2PGossipSync<G, U, L>>,
G: Deref<Target = NetworkGraph<L>>,
U: Deref,
L: Deref,
U: UtxoLookup,
L: Logger,
> GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
where
U::Target: UtxoLookup,
L::Target: Logger,
{
/// Initializes a new [`GossipSync::P2P`] variant.
pub fn p2p(gossip_sync: P) -> Self {
Expand All @@ -278,16 +269,15 @@ impl<
'a,
R: Deref<Target = RapidGossipSync<G, L>>,
G: Deref<Target = NetworkGraph<L>>,
L: Deref,
L: Logger,
>
GossipSync<
&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
R,
G,
&'a (dyn UtxoLookup + Send + Sync),
L,
> where
L::Target: Logger,
>
{
/// Initializes a new [`GossipSync::Rapid`] variant.
pub fn rapid(gossip_sync: R) -> Self {
Expand All @@ -296,26 +286,22 @@ impl<
}

/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
impl<'a, L: Deref>
impl<'a, L: Logger>
GossipSync<
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
&RapidGossipSync<&'a NetworkGraph<L>, L>,
&'a NetworkGraph<L>,
&'a (dyn UtxoLookup + Send + Sync),
L,
> where
L::Target: Logger,
>
{
/// Initializes a new [`GossipSync::None`] variant.
pub fn none() -> Self {
GossipSync::None
}
}

fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
where
L::Target: Logger,
{
fn handle_network_graph_update<L: Logger>(network_graph: &NetworkGraph<L>, event: &Event) {
if let Event::PaymentPathFailed {
failure: PathFailure::OnPath { network_update: Some(ref upd) },
..
Expand Down Expand Up @@ -420,24 +406,15 @@ type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
pub const NO_ONION_MESSENGER: Option<
Arc<
dyn AOnionMessenger<
EntropySource = dyn EntropySource + Send + Sync,
ES = &(dyn EntropySource + Send + Sync),
NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
Logger = dyn Logger + Send + Sync,
L = &'static (dyn Logger + Send + Sync),
NodeIdLookUp = DynChannelManager,
EntropySource = &(dyn EntropySource + Send + Sync),
NodeSigner = &(dyn lightning::sign::NodeSigner + Send + Sync),
Logger = &'static (dyn Logger + Send + Sync),
NL = &'static DynChannelManager,
MessageRouter = DynMessageRouter,
MR = &'static DynMessageRouter,
OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
MessageRouter = &'static DynMessageRouter,
OMH = lightning::ln::peer_handler::IgnoringMessageHandler,
APH = lightning::ln::peer_handler::IgnoringMessageHandler,
DRH = lightning::ln::peer_handler::IgnoringMessageHandler,
CMH = lightning::ln::peer_handler::IgnoringMessageHandler,
> + Send
+ Sync,
>,
Expand Down Expand Up @@ -480,22 +457,17 @@ impl KVStore for DummyKVStore {
pub const NO_LIQUIDITY_MANAGER: Option<
Arc<
dyn ALiquidityManager<
EntropySource = dyn EntropySource + Send + Sync,
ES = &(dyn EntropySource + Send + Sync),
NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
EntropySource = &(dyn EntropySource + Send + Sync),
NodeSigner = &(dyn lightning::sign::NodeSigner + Send + Sync),
AChannelManager = DynChannelManager,
CM = &DynChannelManager,
Filter = dyn chain::Filter + Send + Sync,
C = &(dyn chain::Filter + Send + Sync),
KVStore = DummyKVStore,
K = &DummyKVStore,
TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
+ Send
+ Sync,
T = &(dyn BroadcasterInterface + Send + Sync),
BroadcasterInterface = &(dyn lightning::chain::chaininterface::BroadcasterInterface
+ Send
+ Sync),
> + Send
+ Sync,
>,
Expand All @@ -507,22 +479,18 @@ pub const NO_LIQUIDITY_MANAGER: Option<
pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
Arc<
dyn ALiquidityManagerSync<
EntropySource = dyn EntropySource + Send + Sync,
ES = &(dyn EntropySource + Send + Sync),
NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
EntropySource = &(dyn EntropySource + Send + Sync),
NodeSigner = &(dyn lightning::sign::NodeSigner + Send + Sync),
AChannelManager = DynChannelManager,
CM = &DynChannelManager,
Filter = dyn chain::Filter + Send + Sync,
C = &(dyn chain::Filter + Send + Sync),
KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
+ Send
+ Sync,
T = &(dyn BroadcasterInterface + Send + Sync),
BroadcasterInterface = &(dyn lightning::chain::chaininterface::BroadcasterInterface
+ Send
+ Sync),
> + Send
+ Sync,
>,
Expand Down Expand Up @@ -954,16 +922,16 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
///```
pub async fn process_events_async<
'a,
UL: Deref,
CF: Deref,
T: Deref,
F: Deref,
UL: UtxoLookup,
CF: chain::Filter,
T: BroadcasterInterface,
F: FeeEstimator,
G: Deref<Target = NetworkGraph<L>>,
L: Deref,
L: Logger,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: Deref,
ES: EntropySource,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
CM: Deref,
OM: Deref,
Expand All @@ -972,8 +940,8 @@ pub async fn process_events_async<
PM: Deref,
LM: Deref,
D: Deref,
O: Deref,
K: Deref,
O: OutputSpender,
K: KVStore,
OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
S: Deref<Target = SC>,
SC: for<'b> WriteableScore<'b>,
Expand All @@ -987,20 +955,12 @@ pub async fn process_events_async<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: UtxoLookup,
CF::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
O::Target: OutputSpender,
D::Target: ChangeDestinationSource,
K::Target: KVStore,
{
let async_event_handler = |event| {
let network_graph = gossip_sync.network_graph();
Expand Down Expand Up @@ -1455,16 +1415,16 @@ fn check_and_reset_sleeper<
/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
/// synchronous background persistence.
pub async fn process_events_async_with_kv_store_sync<
UL: Deref,
CF: Deref,
T: Deref,
F: Deref,
UL: UtxoLookup,
CF: chain::Filter,
T: BroadcasterInterface,
F: FeeEstimator,
G: Deref<Target = NetworkGraph<L>>,
L: Deref,
L: Logger,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: Deref,
ES: EntropySource,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
CM: Deref,
OM: Deref,
Expand All @@ -1473,7 +1433,7 @@ pub async fn process_events_async_with_kv_store_sync<
PM: Deref,
LM: Deref,
D: Deref,
O: Deref,
O: OutputSpender,
K: Deref,
OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
S: Deref<Target = SC>,
Expand All @@ -1488,18 +1448,11 @@ pub async fn process_events_async_with_kv_store_sync<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: UtxoLookup,
CF::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManager,
O::Target: OutputSpender,
D::Target: ChangeDestinationSourceSync,
K::Target: KVStoreSync,
{
Expand Down Expand Up @@ -1569,15 +1522,15 @@ impl BackgroundProcessor {
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
pub fn start<
'a,
UL: 'static + Deref,
CF: 'static + Deref,
T: 'static + Deref,
F: 'static + Deref + Send,
UL: 'static + UtxoLookup,
CF: 'static + chain::Filter,
T: 'static + BroadcasterInterface,
F: 'static + FeeEstimator + Send,
G: 'static + Deref<Target = NetworkGraph<L>>,
L: 'static + Deref + Send,
P: 'static + Deref,
EH: 'static + EventHandler + Send,
ES: 'static + Deref + Send,
ES: 'static + EntropySource + Send,
M: 'static
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
Expand All @@ -1593,7 +1546,7 @@ impl BackgroundProcessor {
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
D: 'static + Deref,
O: 'static + Deref,
O: 'static + OutputSpender,
K: 'static + Deref + Send,
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
>(
Expand All @@ -1602,19 +1555,13 @@ impl BackgroundProcessor {
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
) -> Self
where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + chain::Filter,
T::Target: 'static + BroadcasterInterface,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
ES::Target: 'static + EntropySource,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
LM::Target: ALiquidityManagerSync,
D::Target: ChangeDestinationSourceSync,
O::Target: 'static + OutputSpender,
K::Target: 'static + KVStoreSync,
{
let stop_thread = Arc::new(AtomicBool::new(false));
Expand Down
Loading
Loading