Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ bip21 = { version = "0.5", features = ["std"], default-features = false }
base64 = { version = "0.22.1", default-features = false, features = ["std"] }
getrandom = { version = "0.3", default-features = false }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros", "net" ] }
esplora-client = { version = "0.12", default-features = false, features = ["tokio", "async-https-rustls"] }
electrum-client = { version = "0.24.0", default-features = false, features = ["proxy", "use-rustls-ring"] }
libc = "0.2"
Expand Down
182 changes: 107 additions & 75 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
// accordance with one or both of these licenses.

use std::collections::hash_map::{self, HashMap};
use std::net::ToSocketAddrs;
use std::ops::Deref;
use std::sync::{Arc, Mutex};
use std::time::Duration;
Expand All @@ -15,7 +14,7 @@ use bitcoin::secp256k1::PublicKey;
use lightning::ln::msgs::SocketAddress;

use crate::config::TorConfig;
use crate::logger::{log_error, log_info, LdkLogger};
use crate::logger::{log_debug, log_error, log_info, LdkLogger};
use crate::types::{KeysManager, PeerManager};
use crate::Error;

Expand Down Expand Up @@ -56,6 +55,14 @@ where

pub(crate) async fn do_connect_peer(
&self, node_id: PublicKey, addr: SocketAddress,
) -> Result<(), Error> {
let res = self.do_connect_peer_internal(node_id, addr).await;
self.propagate_result_to_subscribers(&node_id, res);
res
}

async fn do_connect_peer_internal(
&self, node_id: PublicKey, addr: SocketAddress,
) -> Result<(), Error> {
// First, we check if there is already an outbound connection in flight, if so, we just
// await on the corresponding watch channel. The task driving the connection future will
Expand All @@ -71,15 +78,14 @@ where

log_info!(self.logger, "Connecting to peer: {}@{}", node_id, addr);

let res = match addr {
match addr {
SocketAddress::OnionV2(old_onion_addr) => {
log_error!(
self.logger,
"Failed to resolve network address {:?}: Resolution of OnionV2 addresses is currently unsupported.",
old_onion_addr
);
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
return Err(Error::InvalidSocketAddress);
self.logger,
"Failed to resolve network address {:?}: Resolution of OnionV2 addresses is currently unsupported.",
old_onion_addr
);
Err(Error::InvalidSocketAddress)
},
SocketAddress::OnionV3 { .. } => {
let proxy_config = self.tor_proxy_config.as_ref().ok_or_else(|| {
Expand All @@ -88,87 +94,113 @@ where
"Failed to resolve network address {:?}: Tor usage is not configured.",
addr
);
self.propagate_result_to_subscribers(
&node_id,
Err(Error::InvalidSocketAddress),
);
Error::InvalidSocketAddress
})?;
let proxy_addr = proxy_config
.proxy_address
.to_socket_addrs()
.map_err(|e| {
log_error!(
self.logger,
"Failed to resolve Tor proxy network address {}: {}",
proxy_config.proxy_address,
e
);
self.propagate_result_to_subscribers(
&node_id,
Err(Error::InvalidSocketAddress),
);
Error::InvalidSocketAddress
})?
.next()
.ok_or_else(|| {
log_error!(
self.logger,
"Failed to resolve Tor proxy network address {}",
proxy_config.proxy_address
);
self.propagate_result_to_subscribers(
&node_id,
Err(Error::InvalidSocketAddress),
);
Error::InvalidSocketAddress
})?;
let connection_future = lightning_net_tokio::tor_connect_outbound(
Arc::clone(&self.peer_manager),
node_id,
addr.clone(),
proxy_addr,
Arc::clone(&self.keys_manager),
);
self.await_connection(connection_future, node_id, addr).await
let resolved_addrs: Vec<_> =
tokio::net::lookup_host(proxy_config.proxy_address.to_string())
.await
.map_err(|e| {
log_error!(
self.logger,
"Failed to resolve Tor proxy network address {}: {}",
proxy_config.proxy_address,
e
);
Error::InvalidSocketAddress
})?
.collect();

if resolved_addrs.is_empty() {
log_error!(
self.logger,
"Failed to resolve Tor proxy network address {}",
proxy_config.proxy_address
);
return Err(Error::InvalidSocketAddress);
}

let mut res = Err(Error::ConnectionFailed);
let mut had_failures = false;
for proxy_addr in resolved_addrs {
let connection_future = lightning_net_tokio::tor_connect_outbound(
Arc::clone(&self.peer_manager),
node_id,
addr.clone(),
proxy_addr,
Arc::clone(&self.keys_manager),
);
res = self.await_connection(connection_future, node_id, addr.clone()).await;
if res.is_ok() {
if had_failures {
log_info!(
self.logger,
"Successfully connected to peer {}@{} via resolved proxy address {} after previous attempts failed.",
node_id, addr, proxy_addr
);
}
break;
}
had_failures = true;
log_debug!(
self.logger,
"Failed to connect to peer {}@{} via resolved proxy address {}.",
node_id,
addr,
proxy_addr
);
}
res
},
_ => {
let socket_addr = addr
.to_socket_addrs()
let resolved_addrs: Vec<_> = tokio::net::lookup_host(addr.to_string())
.await
.map_err(|e| {
log_error!(
self.logger,
"Failed to resolve network address {}: {}",
addr,
e
);
self.propagate_result_to_subscribers(
&node_id,
Err(Error::InvalidSocketAddress),
);
Error::InvalidSocketAddress
})?
.next()
.ok_or_else(|| {
log_error!(self.logger, "Failed to resolve network address {}", addr);
self.propagate_result_to_subscribers(
&node_id,
Err(Error::InvalidSocketAddress),
);
Error::InvalidSocketAddress
})?;
let connection_future = lightning_net_tokio::connect_outbound(
Arc::clone(&self.peer_manager),
node_id,
socket_addr,
);
self.await_connection(connection_future, node_id, addr).await
},
};
.collect();

self.propagate_result_to_subscribers(&node_id, res);
if resolved_addrs.is_empty() {
log_error!(self.logger, "Failed to resolve network address {}", addr);
return Err(Error::InvalidSocketAddress);
}

res
let mut res = Err(Error::ConnectionFailed);
let mut had_failures = false;
for socket_addr in resolved_addrs {
let connection_future = lightning_net_tokio::connect_outbound(
Arc::clone(&self.peer_manager),
node_id,
socket_addr,
);
res = self.await_connection(connection_future, node_id, addr.clone()).await;
if res.is_ok() {
if had_failures {
log_info!(
self.logger,
"Successfully connected to peer {}@{} via resolved address {} after previous attempts failed.",
node_id, addr, socket_addr
);
}
break;
}
had_failures = true;
log_debug!(
self.logger,
"Failed to connect to peer {}@{} via resolved address {}.",
node_id,
addr,
socket_addr
);
}
res
},
}
}

async fn await_connection<F, CF>(
Expand Down
36 changes: 18 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ mod types;
mod wallet;

use std::default::Default;
use std::net::ToSocketAddrs;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(cycle_tests)]
Expand Down Expand Up @@ -361,28 +360,29 @@ impl Node {
let peer_manager_connection_handler = Arc::clone(&self.peer_manager);
let listening_logger = Arc::clone(&self.logger);

let mut bind_addrs = Vec::with_capacity(listening_addresses.len());

for listening_addr in listening_addresses {
let resolved_address = listening_addr.to_socket_addrs().map_err(|e| {
log_error!(
self.logger,
"Unable to resolve listening address: {:?}. Error details: {}",
listening_addr,
e,
);
Error::InvalidSocketAddress
})?;

bind_addrs.extend(resolved_address);
}

let logger = Arc::clone(&listening_logger);
let listening_addrs = listening_addresses.clone();
let listeners = self.runtime.block_on(async move {
let mut bind_addrs = Vec::with_capacity(listening_addrs.len());

for listening_addr in &listening_addrs {
let resolved =
tokio::net::lookup_host(listening_addr.to_string()).await.map_err(|e| {
log_error!(
logger,
"Unable to resolve listening address: {:?}. Error details: {}",
listening_addr,
e,
);
Error::InvalidSocketAddress
})?;
bind_addrs.extend(resolved);
}

let mut listeners = Vec::new();

// Try to bind to all addresses
for addr in &*bind_addrs {
for addr in &bind_addrs {
match tokio::net::TcpListener::bind(addr).await {
Ok(listener) => {
log_trace!(logger, "Listener bound to {}", addr);
Expand Down
Loading