From 230f5440c89c2faf260c3024f57e059c117699b8 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 4 Oct 2024 16:53:24 +0200 Subject: [PATCH 01/17] Inject quin connection in quic transport Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 10 +++++----- mycelium/src/peer_manager.rs | 9 +++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index 819cd7e2..242d9b12 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -43,13 +43,13 @@ pub trait Connection: AsyncRead + AsyncWrite { pub struct Quic { tx: quinn::SendStream, rx: quinn::RecvStream, - remote: SocketAddr, + con: quinn::Connection, } impl Quic { /// Create a new wrapper around Quic streams. - pub fn new(tx: quinn::SendStream, rx: quinn::RecvStream, remote: SocketAddr) -> Self { - Quic { tx, rx, remote } + pub fn new(tx: quinn::SendStream, rx: quinn::RecvStream, con: quinn::Connection) -> Self { + Quic { tx, rx, con } } } @@ -129,11 +129,11 @@ impl AsyncWrite for Quic { impl Connection for Quic { fn identifier(&self) -> Result { - Ok(format!("QUIC -> {}", self.remote)) + Ok(format!("QUIC -> {}", self.con.remote_address())) } fn static_link_cost(&self) -> Result { - Ok(match self.remote { + Ok(match self.con.remote_address() { SocketAddr::V4(_) => PACKET_PROCESSING_COST_IP4_QUIC, SocketAddr::V6(ip) if ip.ip().to_ipv4_mapped().is_some() => { PACKET_PROCESSING_COST_IP4_QUIC diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 26ab00f6..0e61630e 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -638,7 +638,7 @@ where Ok(connecting) => match connecting.await { Ok(con) => match con.open_bi().await { Ok((tx, rx)) => { - let q_con = Quic::new(tx, rx, endpoint.address()); + let q_con = Quic::new(tx, rx, con); let res = { let router = self.router.lock().unwrap(); let router_data_tx = router.router_data_tx(); @@ -863,9 +863,10 @@ where return; } }; + let remote_address = con.remote_address(); let quic_peer = match con.accept_bi().await { - Ok((tx, rx)) => Quic::new(tx, rx, con.remote_address()), + Ok((tx, rx)) => Quic::new(tx, rx, con), Err(e) => { debug!(err=%e, "Failed to accept bidirectional quic stream"); return; @@ -888,9 +889,9 @@ where return; } }; - info!(remote=%con.remote_address(), "Accepted new inbound quic peer"); + info!(remote=%remote_address, "Accepted new inbound quic peer"); self.add_peer( - Endpoint::new(Protocol::Quic, con.remote_address()), + Endpoint::new(Protocol::Quic, remote_address), PeerType::Inbound, ConnectionTraffic { tx_bytes, rx_bytes }, Some(new_peer), From a1aedffea9a832d802ae60d5bfe616c0a66e8612 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Mon, 7 Oct 2024 16:34:09 +0200 Subject: [PATCH 02/17] Rework connection trait Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 170 +++++++++++++++++++++++++---- mycelium/src/connection/tracked.rs | 19 +--- mycelium/src/peer.rs | 100 +++++++---------- mycelium/src/peer_manager.rs | 86 ++++++++------- mycelium/src/router.rs | 9 +- mycelium/src/source_table.rs | 19 +--- 6 files changed, 245 insertions(+), 158 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index 242d9b12..fc467ff7 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -1,11 +1,18 @@ -use std::{io, net::SocketAddr, pin::Pin}; - -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::TcpStream, +use std::{ + future::Future, + io, + net::SocketAddr, + pin::Pin, + sync::{atomic::AtomicU64, Arc}, }; +use crate::packet::{self, ControlPacket, DataPacket, Packet}; + +use futures::{SinkExt, StreamExt}; +use tokio::io::{AsyncRead, AsyncWrite}; + mod tracked; +use tokio_util::codec::Framed; pub use tracked::Tracked; #[cfg(feature = "private-network")] @@ -31,7 +38,27 @@ const PACKET_PROCESSING_COST_IP6_QUIC: u16 = 7; // TODO const PACKET_PROCESSING_COST_IP4_QUIC: u16 = 12; -pub trait Connection: AsyncRead + AsyncWrite { +pub trait Connection { + /// Feeds a data packet on the connection. Depending on the connection you might need to call + /// [`Connection::flush`] before the packet is actually sent. + fn feed_data_packet( + &mut self, + packet: DataPacket, + ) -> impl Future> + Send; + + /// Feeds a control packet on the connection. Depending on the connection you might need to call + /// [`Connection::flush`] before the packet is actually sent. + fn feed_control_packet( + &mut self, + packet: ControlPacket, + ) -> impl Future> + Send; + + /// Flush the connection. This sends all buffered packets which haven't beend sent yet. + fn flush(&mut self) -> impl Future> + Send; + + /// Receive a packet from the remote end. + fn receive_packet(&mut self) -> impl Future>> + Send; + /// Get an identifier for this connection, which shows details about the remote fn identifier(&self) -> Result; @@ -39,31 +66,51 @@ pub trait Connection: AsyncRead + AsyncWrite { fn static_link_cost(&self) -> Result; } -/// A wrapper around a quic send and quic receive stream, implementing the [`Connection`] trait. -pub struct Quic { - tx: quinn::SendStream, - rx: quinn::RecvStream, - con: quinn::Connection, +/// A wrapper about an asynchronous (non blocking) tcp stream. +pub struct TcpStream { + framed: Framed, packet::Codec>, + local_addr: SocketAddr, + peer_addr: SocketAddr, } -impl Quic { - /// Create a new wrapper around Quic streams. - pub fn new(tx: quinn::SendStream, rx: quinn::RecvStream, con: quinn::Connection) -> Self { - Quic { tx, rx, con } +impl TcpStream { + /// Create a new wrapped [`TcpStream`] which implements the [`Connection`] trait. + pub fn new( + tcp_stream: tokio::net::TcpStream, + read: Arc, + write: Arc, + ) -> io::Result { + Ok(Self { + local_addr: tcp_stream.local_addr()?, + peer_addr: tcp_stream.peer_addr()?, + framed: Framed::new(Tracked::new(read, write, tcp_stream), packet::Codec::new()), + }) } } impl Connection for TcpStream { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } + fn identifier(&self) -> Result { - Ok(format!( - "TCP {} <-> {}", - self.local_addr()?, - self.peer_addr()? - )) + Ok(format!("TCP {} <-> {}", self.local_addr, self.peer_addr)) } fn static_link_cost(&self) -> Result { - Ok(match self.peer_addr()? { + Ok(match self.peer_addr { SocketAddr::V4(_) => PACKET_PROCESSING_COST_IP4_TCP, SocketAddr::V6(ip) if ip.ip().to_ipv4_mapped().is_some() => { PACKET_PROCESSING_COST_IP4_TCP @@ -73,7 +120,37 @@ impl Connection for TcpStream { } } -impl AsyncRead for Quic { +/// A wrapper around a quic send and quic receive stream, implementing the [`Connection`] trait. +pub struct Quic { + framed: Framed, packet::Codec>, + con: quinn::Connection, +} + +struct QuicStream { + tx: quinn::SendStream, + rx: quinn::RecvStream, +} + +impl Quic { + /// Create a new wrapper around Quic streams. + pub fn new( + tx: quinn::SendStream, + rx: quinn::RecvStream, + con: quinn::Connection, + read: Arc, + write: Arc, + ) -> Self { + Quic { + framed: Framed::new( + Tracked::new(read, write, QuicStream { tx, rx }), + packet::Codec::new(), + ), + con, + } + } +} + +impl AsyncRead for QuicStream { #[inline] fn poll_read( mut self: std::pin::Pin<&mut Self>, @@ -84,7 +161,7 @@ impl AsyncRead for Quic { } } -impl AsyncWrite for Quic { +impl AsyncWrite for QuicStream { #[inline] fn poll_write( mut self: Pin<&mut Self>, @@ -128,6 +205,22 @@ impl AsyncWrite for Quic { } impl Connection for Quic { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } + fn identifier(&self) -> Result { Ok(format!("QUIC -> {}", self.con.remote_address())) } @@ -144,10 +237,39 @@ impl Connection for Quic { } #[cfg(test)] -use tokio::io::DuplexStream; +/// Wrapper for an in-memory pipe implementing the [`Connection`] trait. +pub struct DuplexStream { + framed: Framed, +} + +#[cfg(test)] +impl DuplexStream { + /// Create a new in memory duplex stream. + pub fn new(duplex: tokio::io::DuplexStream) -> Self { + Self { + framed: Framed::new(duplex, packet::Codec::new()), + } + } +} #[cfg(test)] impl Connection for DuplexStream { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } + fn identifier(&self) -> Result { Ok("Memory pipe".to_string()) } diff --git a/mycelium/src/connection/tracked.rs b/mycelium/src/connection/tracked.rs index 74f12f59..6c8ecac9 100644 --- a/mycelium/src/connection/tracked.rs +++ b/mycelium/src/connection/tracked.rs @@ -9,8 +9,6 @@ use std::{ use tokio::io::{AsyncRead, AsyncWrite}; -use super::Connection; - /// Wrapper which keeps track of how much bytes have been read and written from a connection. pub struct Tracked { /// Bytes read counter @@ -23,7 +21,7 @@ pub struct Tracked { impl Tracked where - C: Connection + Unpin, + C: AsyncRead + AsyncWrite + Unpin, { /// Create a new instance of a tracked connections. Counters are passed in so they can be /// reused accross connections. @@ -32,21 +30,6 @@ where } } -impl Connection for Tracked -where - C: Connection + Unpin, -{ - #[inline] - fn identifier(&self) -> Result { - self.con.identifier() - } - - #[inline] - fn static_link_cost(&self) -> Result { - self.con.static_link_cost() - } -} - impl AsyncRead for Tracked where C: AsyncRead + Unpin, diff --git a/mycelium/src/peer.rs b/mycelium/src/peer.rs index 5b769b4c..216b96c7 100644 --- a/mycelium/src/peer.rs +++ b/mycelium/src/peer.rs @@ -1,9 +1,8 @@ -use futures::{SinkExt, StreamExt}; use std::{ error::Error, io, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, RwLock, Weak, }, }; @@ -11,13 +10,9 @@ use tokio::{ select, sync::{mpsc, Notify}, }; -use tokio_util::codec::Framed; use tracing::{debug, error, info, trace}; -use crate::{ - connection::{self, Connection}, - packet::{self, Packet}, -}; +use crate::{connection::Connection, packet::Packet}; use crate::{ packet::{ControlPacket, DataPacket}, sequence_number::SeqNo, @@ -63,14 +58,9 @@ impl Peer { pub fn new( router_data_tx: mpsc::Sender, router_control_tx: mpsc::UnboundedSender<(ControlPacket, Peer)>, - connection: C, + mut connection: C, dead_peer_sink: mpsc::Sender, - bytes_written: Arc, - bytes_read: Arc, ) -> Result { - // Wrap connection so we can get access to the counters. - let connection = connection::Tracked::new(bytes_read, bytes_written, connection); - // Data channel for peer let (to_peer_data, mut from_routing_data) = mpsc::unbounded_channel::(); // Control channel for peer @@ -90,11 +80,6 @@ impl Peer { }), }; - // Framed for peer - // Used to send and receive packets from a TCP stream - let framed = Framed::with_capacity(connection, packet::Codec::new(), 128 << 10); - let (mut sink, mut stream) = framed.split(); - { let peer = peer.clone(); @@ -103,8 +88,8 @@ impl Peer { loop { select! { // Received over the TCP stream - frame = stream.next() => { - match frame { + packet = connection.receive_packet() => { + match packet { Some(Ok(packet)) => { match packet { Packet::DataPacket(packet) => { @@ -137,46 +122,45 @@ impl Peer { } } - rv = from_routing_data.recv(), if !needs_flush => { - match rv { - None => break, - Some(packet) => { + Some(packet) = from_routing_data.recv() => { + if let Err(e) = connection.feed_data_packet(packet).await { + error!("Failed to feed data packet to connection: {e}"); + break + } - needs_flush = true; - if let Err(e) = sink.feed(Packet::DataPacket(packet)).await { + for _ in 1..PACKET_COALESCE_WINDOW { + // There can be 2 cases of errors here, empty channel and no more + // senders. In both cases we don't really care at this point. + if let Ok(packet) = from_routing_data.try_recv() { + if let Err(e) = connection.feed_data_packet(packet).await { error!("Failed to feed data packet to connection: {e}"); break } + } + } - - for _ in 1..PACKET_COALESCE_WINDOW { - // There can be 2 cases of errors here, empty channel and no more - // senders. In both cases we don't really care at this point. - if let Ok(packet) = from_routing_data.try_recv() { - if let Err(e) = sink.feed(Packet::DataPacket(packet)).await { - error!("Failed to feed data packet to connection: {e}"); - break - } - trace!("Instantly queued ready packet to transfer to peer"); - } else { - // No packets ready, flush currently buffered ones - break - } - } + if needs_flush { + if let Err(e) = connection.flush().await { + error!("Failed to flush buffered peer connection data packets: {e}"); + break } + needs_flush = false; } } - rv = from_routing_control.recv(), if !needs_flush => { - match rv { - None => break, - Some(packet) => { - - needs_flush = true; + Some(packet) = from_routing_control.recv() => { + if let Err(e) = connection.feed_control_packet(packet).await { + error!("Failed to feed control packet to connection: {e}"); + break + } - if let Err(e) = sink.feed(Packet::ControlPacket(packet)).await { - error!("Failed to feed control packet to connection: {e}"); + for _ in 1..PACKET_COALESCE_WINDOW { + // There can be 2 cases of errors here, empty channel and no more + // senders. In both cases we don't really care at this point. + if let Ok(packet) = from_routing_control.try_recv() { + if let Err(e) = connection.feed_control_packet(packet).await { + error!("Failed to feed data packet to connection: {e}"); break } @@ -184,7 +168,7 @@ impl Peer { // There can be 2 cases of errors here, empty channel and no more // senders. In both cases we don't really care at this point. if let Ok(packet) = from_routing_control.try_recv() { - if let Err(e) = sink.feed(Packet::ControlPacket(packet)).await { + if let Err(e) = connection.feed_control_packet(packet).await { error!("Failed to feed data packet to connection: {e}"); break } @@ -195,20 +179,20 @@ impl Peer { } } } - } - r = sink.flush(), if needs_flush => { - if let Err(err) = r { - error!("Failed to flush peer connection: {err}"); - break + if needs_flush { + if let Err(e) = connection.flush().await { + error!("Failed to flush buffered peer connection control packets: {e}"); + break + } + needs_flush = false; } - needs_flush = false; } _ = death_watcher.notified() => { // Attempt gracefull shutdown - let mut framed = sink.reunite(stream).expect("SplitSink and SplitStream here can only be part of the same original Framned; Qed"); - let _ = framed.close().await; + // let mut framed = sink.reunite(stream).expect("SplitSink and SplitStream here can only be part of the same original Framned; Qed"); + // let _ = framed.close().await; break; } } diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 0e61630e..ec8eacea 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -1,4 +1,4 @@ -use crate::connection::Quic; +use crate::connection::{Quic, TcpStream}; use crate::endpoint::{Endpoint, Protocol}; use crate::metrics::Metrics; use crate::peer::{Peer, PeerRef}; @@ -24,7 +24,6 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{collections::hash_map::Entry, future::IntoFuture}; -use tokio::net::TcpStream; use tokio::net::{TcpListener, UdpSocket}; use tokio::task::AbortHandle; use tokio::time::{Instant, MissedTickBehavior}; @@ -496,7 +495,7 @@ where None }; - match TcpStream::connect(endpoint.address()) + match tokio::net::TcpStream::connect(endpoint.address()) .map(|result| result.and_then(|socket| set_fw_mark(socket, self.firewall_mark))) .await { @@ -566,14 +565,22 @@ where }; #[cfg(not(feature = "private-network"))] - let res = Peer::new( - router_data_tx, - router_control_tx, - peer_stream, - dead_peer_sink, - ct.tx_bytes, - ct.rx_bytes, - ); + let res = { + let peer_stream = match TcpStream::new(peer_stream, ct.rx_bytes, ct.tx_bytes) { + Ok(ps) => ps, + Err(err) => { + error!(%err, "Failed to create wrapped tcp stream"); + return (endpoint, None); + } + }; + + Peer::new( + router_data_tx, + router_control_tx, + peer_stream, + dead_peer_sink, + ) + }; match res { Ok(new_peer) => { @@ -638,21 +645,14 @@ where Ok(connecting) => match connecting.await { Ok(con) => match con.open_bi().await { Ok((tx, rx)) => { - let q_con = Quic::new(tx, rx, con); + let q_con = Quic::new(tx, rx, con, ct.tx_bytes, ct.rx_bytes); let res = { let router = self.router.lock().unwrap(); let router_data_tx = router.router_data_tx(); let router_control_tx = router.router_control_tx(); let dead_peer_sink = router.dead_peer_sink().clone(); - Peer::new( - router_data_tx, - router_control_tx, - q_con, - dead_peer_sink, - ct.tx_bytes, - ct.rx_bytes, - ) + Peer::new(router_data_tx, router_control_tx, q_con, dead_peer_sink) }; match res { Ok(new_peer) => { @@ -780,22 +780,32 @@ where }; #[cfg(not(feature = "private-network"))] - let new_peer = Peer::new( - router_data_tx.clone(), - router_control_tx.clone(), - stream, - dead_peer_sink.clone(), - tx_bytes.clone(), - rx_bytes.clone(), - ); + let new_peer = { + let new_stream = + match TcpStream::new(stream, tx_bytes.clone(), rx_bytes.clone()) { + Ok(ns) => ns, + Err(err) => { + error!(%err, "Failed to create wrapped tcp stream"); + continue; + } + }; + + let new_peer = Peer::new( + router_data_tx.clone(), + router_control_tx.clone(), + new_stream, + dead_peer_sink.clone(), + ); - let new_peer = match new_peer { - Ok(peer) => peer, - Err(e) => { - error!(err=%e, "Failed to spawn peer"); - continue; + match new_peer { + Ok(peer) => peer, + Err(e) => { + error!(err=%e, "Failed to spawn peer"); + continue; + } } }; + info!("Accepted new inbound peer"); self.add_peer( Endpoint::new( @@ -865,23 +875,23 @@ where }; let remote_address = con.remote_address(); + + let tx_bytes = Arc::new(AtomicU64::new(0)); + let rx_bytes = Arc::new(AtomicU64::new(0)); + let quic_peer = match con.accept_bi().await { - Ok((tx, rx)) => Quic::new(tx, rx, con), + Ok((tx, rx)) => Quic::new(tx, rx, con, rx_bytes.clone(), tx_bytes.clone()), Err(e) => { debug!(err=%e, "Failed to accept bidirectional quic stream"); return; } }; - let tx_bytes = Arc::new(AtomicU64::new(0)); - let rx_bytes = Arc::new(AtomicU64::new(0)); let new_peer = match Peer::new( router_data_tx.clone(), router_control_tx.clone(), quic_peer, dead_peer_sink.clone(), - tx_bytes.clone(), - rx_bytes.clone(), ) { Ok(peer) => peer, Err(e) => { diff --git a/mycelium/src/router.rs b/mycelium/src/router.rs index e3ebab96..c091969b 100644 --- a/mycelium/src/router.rs +++ b/mycelium/src/router.rs @@ -2091,15 +2091,14 @@ fn advertised_update_interval(sre: &RouteEntry) -> Duration { mod tests { use std::{ net::{IpAddr, Ipv6Addr}, - sync::{atomic::AtomicU64, Arc}, time::Duration, }; use tokio::sync::mpsc; use crate::{ - babel::Update, crypto::PublicKey, metric::Metric, peer::Peer, router_id::RouterId, - sequence_number::SeqNo, source_table::SourceKey, subnet::Subnet, + babel::Update, connection::DuplexStream, crypto::PublicKey, metric::Metric, peer::Peer, + router_id::RouterId, sequence_number::SeqNo, source_table::SourceKey, subnet::Subnet, }; #[test] @@ -2156,10 +2155,8 @@ mod tests { let neighbor = Peer::new( router_data_tx, router_control_tx, - con1, + DuplexStream::new(con1), dead_peer_sink, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), ) .expect("Can create a dummy peer"); let subnet = Subnet::new(IpAddr::V6(Ipv6Addr::new(0x400, 0, 0, 0, 0, 0, 0, 0)), 64) diff --git a/mycelium/src/source_table.rs b/mycelium/src/source_table.rs index 2839065f..562fcb71 100644 --- a/mycelium/src/source_table.rs +++ b/mycelium/src/source_table.rs @@ -169,6 +169,7 @@ mod tests { use crate::{ babel, + connection::DuplexStream, crypto::SecretKey, metric::Metric, peer::Peer, @@ -178,11 +179,7 @@ mod tests { source_table::{FeasibilityDistance, SourceKey, SourceTable}, subnet::Subnet, }; - use std::{ - net::Ipv6Addr, - sync::{atomic::AtomicU64, Arc}, - time::Duration, - }; + use std::{net::Ipv6Addr, time::Duration}; /// A retraction is always considered to be feasible. #[tokio::test] @@ -378,10 +375,8 @@ mod tests { let neighbor = Peer::new( router_data_tx, router_control_tx, - con1, + DuplexStream::new(con1), dead_peer_sink, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), ) .expect("Can create a dummy peer"); @@ -423,10 +418,8 @@ mod tests { let neighbor = Peer::new( router_data_tx, router_control_tx, - con1, + DuplexStream::new(con1), dead_peer_sink, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), ) .expect("Can create a dummy peer"); @@ -468,10 +461,8 @@ mod tests { let neighbor = Peer::new( router_data_tx, router_control_tx, - con1, + DuplexStream::new(con1), dead_peer_sink, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), ) .expect("Can create a dummy peer"); From 4ae1a8263d5f77c864e87eff02b70dad94eb353c Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 10 Oct 2024 10:17:02 +0200 Subject: [PATCH 03/17] Initial QUIC datagram frames Still WIP Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 33 ++++++++++++++++++++++++++++++--- mycelium/src/crypto.rs | 2 +- mycelium/src/peer_manager.rs | 10 ++++++---- mycelium/src/tun/linux.rs | 2 +- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index fc467ff7..e04dc577 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -8,11 +8,13 @@ use std::{ use crate::packet::{self, ControlPacket, DataPacket, Packet}; +use bytes::BytesMut; use futures::{SinkExt, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite}; mod tracked; -use tokio_util::codec::Framed; +use tokio_util::codec::{Decoder, Encoder, Framed}; +use tracing::info; pub use tracked::Tracked; #[cfg(feature = "private-network")] @@ -206,7 +208,13 @@ impl AsyncWrite for QuicStream { impl Connection for Quic { async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { - self.framed.feed(Packet::DataPacket(packet)).await + let mut codec = packet::Codec::new(); + let mut buffer = BytesMut::with_capacity(1500); + codec.encode(Packet::DataPacket(packet), &mut buffer)?; + + self.con + .send_datagram(buffer.into()) + .map_err(|sde| io::Error::new(io::ErrorKind::Other, sde)) } async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { @@ -214,7 +222,26 @@ impl Connection for Quic { } async fn receive_packet(&mut self) -> Option> { - self.framed.next().await + tokio::select! { + datagram = self.con.read_datagram() => { + let datagram_bytes = match datagram { + Ok(buffer) => buffer, + Err(e) => return Some(Err(e.into())), + }; + let mut codec = packet::Codec::new(); + match codec.decode(&mut datagram_bytes.into()) { + Ok(Some(packet)) => Some(Ok(packet)), + // Partial? packet read. We consider this to be a stream hangup + // TODO: verify + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }, + packet = self.framed.next() => { + packet + } + + } } async fn flush(&mut self) -> io::Result<()> { diff --git a/mycelium/src/crypto.rs b/mycelium/src/crypto.rs index affd07d6..36febcf9 100644 --- a/mycelium/src/crypto.rs +++ b/mycelium/src/crypto.rs @@ -15,7 +15,7 @@ use serde::{de::Visitor, Deserialize, Serialize}; /// const generic argument which is then expanded with the needed extra space for the buffer, /// however as it stands const generics can only be used standalone and not in a constant /// expression. This _is_ possible on nightly rust, with a feature gate (generic_const_exprs). -const PACKET_SIZE: usize = 1400; +const PACKET_SIZE: usize = 1300; /// Size of an AES_GCM tag in bytes. const AES_TAG_SIZE: usize = 16; diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index ec8eacea..0a340052 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -637,8 +637,9 @@ where transport_config.mtu_discovery_config(Some(MtuDiscoveryConfig::default())); transport_config.keep_alive_interval(Some(Duration::from_secs(20))); // we don't use datagrams. - transport_config.datagram_receive_buffer_size(None); - transport_config.datagram_send_buffer_size(0); + transport_config.datagram_receive_buffer_size(Some(16 << 20)); + transport_config.datagram_send_buffer_size(16 << 20); + transport_config.initial_mtu(1500); config.transport_config(Arc::new(transport_config)); match quic_socket.connect_with(config, endpoint.address(), "dummy.mycelium") { @@ -1218,8 +1219,9 @@ fn make_quic_endpoint( transport_config.mtu_discovery_config(Some(MtuDiscoveryConfig::default())); transport_config.keep_alive_interval(Some(Duration::from_secs(20))); // we don't use datagrams. - transport_config.datagram_receive_buffer_size(None); - transport_config.datagram_send_buffer_size(0); + transport_config.datagram_receive_buffer_size(Some(16 << 20)); + transport_config.datagram_send_buffer_size(16 << 20); + transport_config.initial_mtu(1500); // TODO: further tweak this. let socket = std::net::UdpSocket::bind(("::", quic_listen_port)) diff --git a/mycelium/src/tun/linux.rs b/mycelium/src/tun/linux.rs index 4958a218..fecafe89 100644 --- a/mycelium/src/tun/linux.rs +++ b/mycelium/src/tun/linux.rs @@ -13,7 +13,7 @@ use crate::subnet::Subnet; use crate::tun::TunConfig; // TODO -const LINK_MTU: i32 = 1400; +const LINK_MTU: i32 = 1300; /// Create a new tun interface and set required routes /// From a8176b0fd71cd0c9a726b7d9532d581d4afe0b5a Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 20 Nov 2025 16:37:25 +0100 Subject: [PATCH 04/17] Always flush connection to a peer after queuing a packet Signed-off-by: Lee Smet --- mycelium/src/peer.rs | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/mycelium/src/peer.rs b/mycelium/src/peer.rs index 216b96c7..e8b032e2 100644 --- a/mycelium/src/peer.rs +++ b/mycelium/src/peer.rs @@ -84,7 +84,6 @@ impl Peer { let peer = peer.clone(); tokio::spawn(async move { - let mut needs_flush = false; loop { select! { // Received over the TCP stream @@ -140,12 +139,9 @@ impl Peer { } } - if needs_flush { - if let Err(e) = connection.flush().await { - error!("Failed to flush buffered peer connection data packets: {e}"); - break - } - needs_flush = false; + if let Err(e) = connection.flush().await { + error!("Failed to flush buffered peer connection data packets: {e}"); + break } } @@ -180,12 +176,9 @@ impl Peer { } } - if needs_flush { - if let Err(e) = connection.flush().await { - error!("Failed to flush buffered peer connection control packets: {e}"); - break - } - needs_flush = false; + if let Err(e) = connection.flush().await { + error!("Failed to flush buffered peer connection control packets: {e}"); + break } } From 0ffa212cff351ec1c5cc6d8a624e74478a3fe007 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 20 Nov 2025 16:37:47 +0100 Subject: [PATCH 05/17] Make sure GSO is enabled if possible Signed-off-by: Lee Smet --- mycelium/src/peer_manager.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 0a340052..9cefdec4 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -1218,17 +1218,17 @@ fn make_quic_endpoint( transport_config.max_idle_timeout(Some(Duration::from_secs(60).try_into()?)); transport_config.mtu_discovery_config(Some(MtuDiscoveryConfig::default())); transport_config.keep_alive_interval(Some(Duration::from_secs(20))); - // we don't use datagrams. transport_config.datagram_receive_buffer_size(Some(16 << 20)); transport_config.datagram_send_buffer_size(16 << 20); transport_config.initial_mtu(1500); + transport_config.enable_segmentation_offload(true); // TODO: further tweak this. let socket = std::net::UdpSocket::bind(("::", quic_listen_port)) .and_then(|socket| set_fw_mark(socket, firewall_mark))?; debug!("Bound UDP socket for Quic"); - //TODO tweak or confirm + // TODO: tweak or confirm let endpoint = quinn::Endpoint::new( quinn::EndpointConfig::default(), Some(server_config), From 9f448a0944df7fb8b34c080989457a23a4582065 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 20 Nov 2025 16:41:52 +0100 Subject: [PATCH 06/17] Remove unused imports Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 1 - mycelium/src/peer.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index e04dc577..86af6b5a 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -14,7 +14,6 @@ use tokio::io::{AsyncRead, AsyncWrite}; mod tracked; use tokio_util::codec::{Decoder, Encoder, Framed}; -use tracing::info; pub use tracked::Tracked; #[cfg(feature = "private-network")] diff --git a/mycelium/src/peer.rs b/mycelium/src/peer.rs index e8b032e2..187d2b41 100644 --- a/mycelium/src/peer.rs +++ b/mycelium/src/peer.rs @@ -10,7 +10,7 @@ use tokio::{ select, sync::{mpsc, Notify}, }; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info}; use crate::{connection::Connection, packet::Packet}; use crate::{ From bcedbe2744756fc902297b4040bc90a6228ab36b Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 20 Nov 2025 16:42:03 +0100 Subject: [PATCH 07/17] Avoid needless error map closure Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index 86af6b5a..f4461890 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -213,7 +213,7 @@ impl Connection for Quic { self.con .send_datagram(buffer.into()) - .map_err(|sde| io::Error::new(io::ErrorKind::Other, sde)) + .map_err(io::Error::other) } async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { From f68b8823bbaab41040f5f0ded6f7d67af0b66b91 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 21 Nov 2025 14:53:38 +0100 Subject: [PATCH 08/17] Use BBR congestion controller for quic Signed-off-by: Lee Smet --- mycelium/src/peer_manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 9cefdec4..a1353d96 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -9,7 +9,7 @@ use futures::{FutureExt, StreamExt}; #[cfg(feature = "private-network")] use openssl::ssl::{Ssl, SslAcceptor, SslConnector, SslMethod}; use quinn::crypto::rustls::QuicClientConfig; -use quinn::{MtuDiscoveryConfig, ServerConfig, TransportConfig}; +use quinn::{congestion, MtuDiscoveryConfig, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -1222,6 +1222,7 @@ fn make_quic_endpoint( transport_config.datagram_send_buffer_size(16 << 20); transport_config.initial_mtu(1500); transport_config.enable_segmentation_offload(true); + transport_config.congestion_controller_factory(Arc::new(congestion::BbrConfig::default())); // TODO: further tweak this. let socket = std::net::UdpSocket::bind(("::", quic_listen_port)) From 4b0e392b9dfbdecde255b3d323e1fa0436876b21 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 26 Nov 2025 10:32:25 +0100 Subject: [PATCH 09/17] Revert "Use BBR congestion controller for quic" This reverts commit f68b8823bbaab41040f5f0ded6f7d67af0b66b91. BBR congestion controller implementation in quinn is currently not maintained --- mycelium/src/peer_manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index a1353d96..9cefdec4 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -9,7 +9,7 @@ use futures::{FutureExt, StreamExt}; #[cfg(feature = "private-network")] use openssl::ssl::{Ssl, SslAcceptor, SslConnector, SslMethod}; use quinn::crypto::rustls::QuicClientConfig; -use quinn::{congestion, MtuDiscoveryConfig, ServerConfig, TransportConfig}; +use quinn::{MtuDiscoveryConfig, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -1222,7 +1222,6 @@ fn make_quic_endpoint( transport_config.datagram_send_buffer_size(16 << 20); transport_config.initial_mtu(1500); transport_config.enable_segmentation_offload(true); - transport_config.congestion_controller_factory(Arc::new(congestion::BbrConfig::default())); // TODO: further tweak this. let socket = std::net::UdpSocket::bind(("::", quic_listen_port)) From 2d1356e8967c24ffbd2b90432a27de7d357030bc Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 26 Nov 2025 15:11:22 +0100 Subject: [PATCH 10/17] Quinn transport config tuning Signed-off-by: Lee Smet --- mycelium/src/peer_manager.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 9cefdec4..3cca839d 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -9,7 +9,7 @@ use futures::{FutureExt, StreamExt}; #[cfg(feature = "private-network")] use openssl::ssl::{Ssl, SslAcceptor, SslConnector, SslMethod}; use quinn::crypto::rustls::QuicClientConfig; -use quinn::{MtuDiscoveryConfig, ServerConfig, TransportConfig}; +use quinn::{congestion, MtuDiscoveryConfig, ServerConfig, TransportConfig}; use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName, UnixTime}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -1222,7 +1222,11 @@ fn make_quic_endpoint( transport_config.datagram_send_buffer_size(16 << 20); transport_config.initial_mtu(1500); transport_config.enable_segmentation_offload(true); - // TODO: further tweak this. + transport_config.send_window((8 * (10u32 << 20)).into()); + transport_config.stream_receive_window((10u32 << 20).into()); + let mut congestion_controller = congestion::CubicConfig::default(); + congestion_controller.initial_window(1 << 22); // 4MiB + // TODO: further tweak this. let socket = std::net::UdpSocket::bind(("::", quic_listen_port)) .and_then(|socket| set_fw_mark(socket, firewall_mark))?; From 0a71d5bae765797a477041be8cbc68d8081b195b Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 3 Dec 2025 12:48:25 +0100 Subject: [PATCH 11/17] Fix private network peer construction Signed-off-by: Lee Smet --- mycelium/src/peer_manager.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index 3cca839d..e4709c67 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -549,8 +549,6 @@ where router_control_tx, ssl_stream, dead_peer_sink, - ct.tx_bytes, - ct.rx_bytes, ) } else { Peer::new( @@ -558,8 +556,6 @@ where router_control_tx, peer_stream, dead_peer_sink, - ct.tx_bytes, - ct.rx_bytes, ) } }; @@ -766,8 +762,6 @@ where router_control_tx.clone(), ssl_stream, dead_peer_sink.clone(), - tx_bytes.clone(), - rx_bytes.clone(), ) } else { Peer::new( @@ -775,8 +769,6 @@ where router_control_tx.clone(), stream, dead_peer_sink.clone(), - tx_bytes.clone(), - rx_bytes.clone(), ) }; @@ -791,13 +783,15 @@ where } }; - let new_peer = Peer::new( + Peer::new( router_data_tx.clone(), router_control_tx.clone(), new_stream, dead_peer_sink.clone(), - ); + ) + }; + let new_peer = { match new_peer { Ok(peer) => peer, Err(e) => { From 9c86072bb948ab3277de79d8d99b166590b29eda Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 3 Dec 2025 15:12:39 +0100 Subject: [PATCH 12/17] Implement updated Connection trait for new TlsStream ssl wrapper Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 2 +- mycelium/src/connection/tls.rs | 64 +++++++++++++++++++++++++++++----- mycelium/src/peer_manager.rs | 49 +++++++++++++++++++++++--- 3 files changed, 102 insertions(+), 13 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index f4461890..84c6e873 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -17,7 +17,7 @@ use tokio_util::codec::{Decoder, Encoder, Framed}; pub use tracked::Tracked; #[cfg(feature = "private-network")] -mod tls; +pub mod tls; /// Cost to add to the peer_link_cost for "local processing", when peers are connected over IPv6. /// diff --git a/mycelium/src/connection/tls.rs b/mycelium/src/connection/tls.rs index a73571e8..557b1f58 100644 --- a/mycelium/src/connection/tls.rs +++ b/mycelium/src/connection/tls.rs @@ -1,18 +1,66 @@ -use std::{io, net::SocketAddr}; +use std::{ + io, + net::SocketAddr, + sync::{atomic::AtomicU64, Arc}, +}; +use futures::{SinkExt, StreamExt}; use tokio::net::TcpStream; +use tokio_util::codec::Framed; + +use crate::{ + connection::Tracked, + packet::{self, Packet}, +}; + +/// A wrapper around an asynchronous TLS stream. +pub struct TlsStream { + framed: Framed>, packet::Codec>, + local_addr: SocketAddr, + peer_addr: SocketAddr, +} + +impl TlsStream { + /// Create a new wrapped [`TlsStream`] which implements the [`Connection`](super::Connection) trait. + pub fn new( + tls_stream: tokio_openssl::SslStream, + read: Arc, + write: Arc, + ) -> io::Result { + Ok(Self { + local_addr: tls_stream.get_ref().local_addr()?, + peer_addr: tls_stream.get_ref().peer_addr()?, + framed: Framed::new(Tracked::new(read, write, tls_stream), packet::Codec::new()), + }) + } +} + +impl super::Connection for TlsStream { + async fn feed_data_packet(&mut self, packet: crate::packet::DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet( + &mut self, + packet: crate::packet::ControlPacket, + ) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } + + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } -impl super::Connection for tokio_openssl::SslStream { fn identifier(&self) -> Result { - Ok(format!( - "TLS {} <-> {}", - self.get_ref().local_addr()?, - self.get_ref().peer_addr()? - )) + Ok(format!("TLS {} <-> {}", self.local_addr, self.peer_addr)) } fn static_link_cost(&self) -> Result { - Ok(match self.get_ref().peer_addr()? { + Ok(match self.peer_addr { SocketAddr::V4(_) => super::PACKET_PROCESSING_COST_IP4_TCP, SocketAddr::V6(ip) if ip.ip().to_ipv4_mapped().is_some() => { super::PACKET_PROCESSING_COST_IP4_TCP diff --git a/mycelium/src/peer_manager.rs b/mycelium/src/peer_manager.rs index e4709c67..5b28d96f 100644 --- a/mycelium/src/peer_manager.rs +++ b/mycelium/src/peer_manager.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "private-network")] +use crate::connection::tls::TlsStream; use crate::connection::{Quic, TcpStream}; use crate::endpoint::{Endpoint, Protocol}; use crate::metrics::Metrics; @@ -544,13 +546,31 @@ where } debug!("Completed TLS handshake"); + let tls_stream = match TlsStream::new(ssl_stream, ct.rx_bytes, ct.tx_bytes) + { + Ok(tls_stream) => tls_stream, + Err(err) => { + error!(%err, "Failed to create wrapped Tls stream"); + return (endpoint, None); + } + }; + Peer::new( router_data_tx, router_control_tx, - ssl_stream, + tls_stream, dead_peer_sink, ) } else { + let peer_stream = + match TcpStream::new(peer_stream, ct.rx_bytes, ct.tx_bytes) { + Ok(ps) => ps, + Err(err) => { + error!(%err, "Failed to create wrapped tcp stream"); + return (endpoint, None); + } + }; + Peer::new( router_data_tx, router_control_tx, @@ -757,17 +777,38 @@ where } debug!(%remote, "Accepted TLS handshake"); + let tls_stream = match TlsStream::new( + ssl_stream, + rx_bytes.clone(), + tx_bytes.clone(), + ) { + Ok(tls_stream) => tls_stream, + Err(err) => { + error!(%err, "Failed to create wrapped Tls stream"); + continue; + } + }; + Peer::new( router_data_tx.clone(), router_control_tx.clone(), - ssl_stream, + tls_stream, dead_peer_sink.clone(), ) } else { + let new_stream = + match TcpStream::new(stream, rx_bytes.clone(), tx_bytes.clone()) { + Ok(ns) => ns, + Err(err) => { + error!(%err, "Failed to create wrapped tcp stream"); + continue; + } + }; + Peer::new( router_data_tx.clone(), router_control_tx.clone(), - stream, + new_stream, dead_peer_sink.clone(), ) }; @@ -775,7 +816,7 @@ where #[cfg(not(feature = "private-network"))] let new_peer = { let new_stream = - match TcpStream::new(stream, tx_bytes.clone(), rx_bytes.clone()) { + match TcpStream::new(stream, rx_bytes.clone(), tx_bytes.clone()) { Ok(ns) => ns, Err(err) => { error!(%err, "Failed to create wrapped tcp stream"); From 3c2262b70aaa7336fb51827e9aedbee913115af4 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 3 Dec 2025 15:40:10 +0100 Subject: [PATCH 13/17] Restore crypto buffer packet size to 1400 Signed-off-by: Lee Smet --- mycelium/src/crypto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mycelium/src/crypto.rs b/mycelium/src/crypto.rs index 36febcf9..5bf123b9 100644 --- a/mycelium/src/crypto.rs +++ b/mycelium/src/crypto.rs @@ -15,7 +15,7 @@ use serde::{de::Visitor, Deserialize, Serialize}; /// const generic argument which is then expanded with the needed extra space for the buffer, /// however as it stands const generics can only be used standalone and not in a constant /// expression. This _is_ possible on nightly rust, with a feature gate (generic_const_exprs). -const PACKET_SIZE: usize = 1300; +const PACKET_SIZE: usize = 1_400; /// Size of an AES_GCM tag in bytes. const AES_TAG_SIZE: usize = 16; From 8f0b9595ddcd18bd81feaa4454c68b0b3ed5e2f1 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 4 Dec 2025 15:12:24 +0100 Subject: [PATCH 14/17] Track send and received bytes for Quic datagrams Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index 84c6e873..5e1d5250 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -3,12 +3,15 @@ use std::{ io, net::SocketAddr, pin::Pin, - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use crate::packet::{self, ControlPacket, DataPacket, Packet}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::{SinkExt, StreamExt}; use tokio::io::{AsyncRead, AsyncWrite}; @@ -125,6 +128,8 @@ impl Connection for TcpStream { pub struct Quic { framed: Framed, packet::Codec>, con: quinn::Connection, + read: Arc, + write: Arc, } struct QuicStream { @@ -143,10 +148,12 @@ impl Quic { ) -> Self { Quic { framed: Framed::new( - Tracked::new(read, write, QuicStream { tx, rx }), + Tracked::new(read.clone(), write.clone(), QuicStream { tx, rx }), packet::Codec::new(), ), con, + read, + write, } } } @@ -211,9 +218,11 @@ impl Connection for Quic { let mut buffer = BytesMut::with_capacity(1500); codec.encode(Packet::DataPacket(packet), &mut buffer)?; - self.con - .send_datagram(buffer.into()) - .map_err(io::Error::other) + let data: Bytes = buffer.into(); + let tx_len = data.len(); + self.write.fetch_add(tx_len as u64, Ordering::Relaxed); + + self.con.send_datagram(data).map_err(io::Error::other) } async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { @@ -227,6 +236,8 @@ impl Connection for Quic { Ok(buffer) => buffer, Err(e) => return Some(Err(e.into())), }; + let recv_len = datagram_bytes.len(); + self.read.fetch_add(recv_len as u64, Ordering::Relaxed); let mut codec = packet::Codec::new(); match codec.decode(&mut datagram_bytes.into()) { Ok(Some(packet)) => Some(Ok(packet)), From 8357999f7703bd84a33701ba43e90e9e1c13f1c1 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 4 Dec 2025 17:03:56 +0100 Subject: [PATCH 15/17] Add flushing for peer connections as separate select branch This restores the old behavior which was introduced to fix an issue where a peer task could get stuck in a flush call (which was not a select branch), thus preventing the task from ever exitting, causing a buildup of tasks (which could keep open OS resources like file descriptors, causing an eventual resource exhaustion). To do this, introduce a Connection{Read,Write}Half, and add a `split` method to the Connection trait. This mimics the behavior of the old code which wrapped a connection in a framed and then split that entirely. Signed-off-by: Lee Smet --- mycelium/src/connection.rs | 208 ++++++++++++++++++++++++++++++++- mycelium/src/connection/tls.rs | 51 +++++++- mycelium/src/peer.rs | 81 +++++++------ 3 files changed, 303 insertions(+), 37 deletions(-) diff --git a/mycelium/src/connection.rs b/mycelium/src/connection.rs index 5e1d5250..1f19aef7 100644 --- a/mycelium/src/connection.rs +++ b/mycelium/src/connection.rs @@ -12,7 +12,10 @@ use std::{ use crate::packet::{self, ControlPacket, DataPacket, Packet}; use bytes::{Bytes, BytesMut}; -use futures::{SinkExt, StreamExt}; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; use tokio::io::{AsyncRead, AsyncWrite}; mod tracked; @@ -42,7 +45,34 @@ const PACKET_PROCESSING_COST_IP6_QUIC: u16 = 7; // TODO const PACKET_PROCESSING_COST_IP4_QUIC: u16 = 12; +pub trait ConnectionReadHalf: Send { + /// Receive a packet from the remote end. + fn receive_packet(&mut self) -> impl Future>> + Send; +} + +pub trait ConnectionWriteHalf: Send { + /// Feeds a data packet on the connection. Depending on the connection you might need to call + /// [`Connection::flush`] before the packet is actually sent. + fn feed_data_packet( + &mut self, + packet: DataPacket, + ) -> impl Future> + Send; + + /// Feeds a control packet on the connection. Depending on the connection you might need to call + /// [`Connection::flush`] before the packet is actually sent. + fn feed_control_packet( + &mut self, + packet: ControlPacket, + ) -> impl Future> + Send; + + /// Flush the connection. This sends all buffered packets which haven't beend sent yet. + fn flush(&mut self) -> impl Future> + Send; +} + pub trait Connection { + type ReadHalf: ConnectionReadHalf; + type WriteHalf: ConnectionWriteHalf; + /// Feeds a data packet on the connection. Depending on the connection you might need to call /// [`Connection::flush`] before the packet is actually sent. fn feed_data_packet( @@ -68,6 +98,9 @@ pub trait Connection { /// The static cost of using this connection fn static_link_cost(&self) -> Result; + + /// Split the connection in a read and write half which can be used independently + fn split(self) -> (Self::ReadHalf, Self::WriteHalf); } /// A wrapper about an asynchronous (non blocking) tcp stream. @@ -93,6 +126,9 @@ impl TcpStream { } impl Connection for TcpStream { + type ReadHalf = TcpStreamReadHalf; + type WriteHalf = TcpStreamWriteHalf; + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { self.framed.feed(Packet::DataPacket(packet)).await } @@ -122,6 +158,43 @@ impl Connection for TcpStream { SocketAddr::V6(_) => PACKET_PROCESSING_COST_IP6_TCP, }) } + + fn split(self) -> (Self::ReadHalf, Self::WriteHalf) { + let (tx, rx) = self.framed.split(); + + ( + TcpStreamReadHalf { framed: rx }, + TcpStreamWriteHalf { framed: tx }, + ) + } +} + +pub struct TcpStreamReadHalf { + framed: SplitStream, packet::Codec>>, +} + +impl ConnectionReadHalf for TcpStreamReadHalf { + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } +} + +pub struct TcpStreamWriteHalf { + framed: SplitSink, packet::Codec>, packet::Packet>, +} + +impl ConnectionWriteHalf for TcpStreamWriteHalf { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } } /// A wrapper around a quic send and quic receive stream, implementing the [`Connection`] trait. @@ -213,6 +286,10 @@ impl AsyncWrite for QuicStream { } impl Connection for Quic { + type ReadHalf = QuicReadHalf; + + type WriteHalf = QuicWriteHalf; + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { let mut codec = packet::Codec::new(); let mut buffer = BytesMut::with_capacity(1500); @@ -271,6 +348,91 @@ impl Connection for Quic { SocketAddr::V6(_) => PACKET_PROCESSING_COST_IP6_QUIC, }) } + + fn split(self) -> (Self::ReadHalf, Self::WriteHalf) { + let Self { + framed, + con, + read, + write, + } = self; + + let (tx, rx) = framed.split(); + + ( + QuicReadHalf { + framed: rx, + con: con.clone(), + read, + }, + QuicWriteHalf { + framed: tx, + con, + write, + }, + ) + } +} + +pub struct QuicReadHalf { + framed: SplitStream, packet::Codec>>, + con: quinn::Connection, + read: Arc, +} + +pub struct QuicWriteHalf { + framed: SplitSink, packet::Codec>, packet::Packet>, + con: quinn::Connection, + write: Arc, +} + +impl ConnectionReadHalf for QuicReadHalf { + async fn receive_packet(&mut self) -> Option> { + tokio::select! { + datagram = self.con.read_datagram() => { + let datagram_bytes = match datagram { + Ok(buffer) => buffer, + Err(e) => return Some(Err(e.into())), + }; + let recv_len = datagram_bytes.len(); + self.read.fetch_add(recv_len as u64, Ordering::Relaxed); + let mut codec = packet::Codec::new(); + match codec.decode(&mut datagram_bytes.into()) { + Ok(Some(packet)) => Some(Ok(packet)), + // Partial? packet read. We consider this to be a stream hangup + // TODO: verify + Ok(None) => None, + Err(e) => Some(Err(e)), + } + }, + packet = self.framed.next() => { + packet + } + + } + } +} + +impl ConnectionWriteHalf for QuicWriteHalf { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + let mut codec = packet::Codec::new(); + let mut buffer = BytesMut::with_capacity(1500); + codec.encode(Packet::DataPacket(packet), &mut buffer)?; + + let data: Bytes = buffer.into(); + let tx_len = data.len(); + self.write.fetch_add(tx_len as u64, Ordering::Relaxed); + + self.con.send_datagram(data).map_err(io::Error::other) + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } } #[cfg(test)] @@ -291,6 +453,9 @@ impl DuplexStream { #[cfg(test)] impl Connection for DuplexStream { + type ReadHalf = DuplexStreamReadHalf; + type WriteHalf = DuplexStreamWriteHalf; + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { self.framed.feed(Packet::DataPacket(packet)).await } @@ -314,4 +479,45 @@ impl Connection for DuplexStream { fn static_link_cost(&self) -> Result { Ok(1) } + + fn split(self) -> (Self::ReadHalf, Self::WriteHalf) { + let (tx, rx) = self.framed.split(); + + ( + DuplexStreamReadHalf { framed: rx }, + DuplexStreamWriteHalf { framed: tx }, + ) + } +} + +#[cfg(test)] +pub struct DuplexStreamReadHalf { + framed: SplitStream>, +} + +#[cfg(test)] +pub struct DuplexStreamWriteHalf { + framed: SplitSink, packet::Packet>, +} + +#[cfg(test)] +impl ConnectionReadHalf for DuplexStreamReadHalf { + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } +} + +#[cfg(test)] +impl ConnectionWriteHalf for DuplexStreamWriteHalf { + async fn feed_data_packet(&mut self, packet: DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet(&mut self, packet: ControlPacket) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } } diff --git a/mycelium/src/connection/tls.rs b/mycelium/src/connection/tls.rs index 557b1f58..7eb73504 100644 --- a/mycelium/src/connection/tls.rs +++ b/mycelium/src/connection/tls.rs @@ -4,7 +4,10 @@ use std::{ sync::{atomic::AtomicU64, Arc}, }; -use futures::{SinkExt, StreamExt}; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; use tokio::net::TcpStream; use tokio_util::codec::Framed; @@ -36,6 +39,9 @@ impl TlsStream { } impl super::Connection for TlsStream { + type ReadHalf = TlsStreamReadHalf; + type WriteHalf = TlsStreamWriteHalf; + async fn feed_data_packet(&mut self, packet: crate::packet::DataPacket) -> io::Result<()> { self.framed.feed(Packet::DataPacket(packet)).await } @@ -68,4 +74,47 @@ impl super::Connection for TlsStream { SocketAddr::V6(_) => super::PACKET_PROCESSING_COST_IP6_TCP, }) } + + fn split(self) -> (Self::ReadHalf, Self::WriteHalf) { + let (tx, rx) = self.framed.split(); + + ( + TlsStreamReadHalf { framed: rx }, + TlsStreamWriteHalf { framed: tx }, + ) + } +} + +pub struct TlsStreamReadHalf { + framed: SplitStream>, packet::Codec>>, +} + +pub struct TlsStreamWriteHalf { + framed: SplitSink< + Framed>, packet::Codec>, + packet::Packet, + >, +} + +impl super::ConnectionReadHalf for TlsStreamReadHalf { + async fn receive_packet(&mut self) -> Option> { + self.framed.next().await + } +} + +impl super::ConnectionWriteHalf for TlsStreamWriteHalf { + async fn feed_data_packet(&mut self, packet: crate::packet::DataPacket) -> io::Result<()> { + self.framed.feed(Packet::DataPacket(packet)).await + } + + async fn feed_control_packet( + &mut self, + packet: crate::packet::ControlPacket, + ) -> io::Result<()> { + self.framed.feed(Packet::ControlPacket(packet)).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.framed.flush().await + } } diff --git a/mycelium/src/peer.rs b/mycelium/src/peer.rs index 187d2b41..fd6404da 100644 --- a/mycelium/src/peer.rs +++ b/mycelium/src/peer.rs @@ -10,9 +10,12 @@ use tokio::{ select, sync::{mpsc, Notify}, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; -use crate::{connection::Connection, packet::Packet}; +use crate::{ + connection::{Connection, ConnectionReadHalf, ConnectionWriteHalf}, + packet::Packet, +}; use crate::{ packet::{ControlPacket, DataPacket}, sequence_number::SeqNo, @@ -58,7 +61,7 @@ impl Peer { pub fn new( router_data_tx: mpsc::Sender, router_control_tx: mpsc::UnboundedSender<(ControlPacket, Peer)>, - mut connection: C, + connection: C, dead_peer_sink: mpsc::Sender, ) -> Result { // Data channel for peer @@ -83,11 +86,14 @@ impl Peer { { let peer = peer.clone(); + let (mut stream, mut sink) = connection.split(); + + let mut needs_flush = false; + tokio::spawn(async move { loop { select! { - // Received over the TCP stream - packet = connection.receive_packet() => { + packet = stream.receive_packet() => { match packet { Some(Ok(packet)) => { match packet { @@ -121,42 +127,44 @@ impl Peer { } } - Some(packet) = from_routing_data.recv() => { - if let Err(e) = connection.feed_data_packet(packet).await { - error!("Failed to feed data packet to connection: {e}"); - break - } - + rv = from_routing_data.recv(), if !needs_flush => { + match rv { + None => break, + Some(packet) => { + needs_flush = true; - for _ in 1..PACKET_COALESCE_WINDOW { - // There can be 2 cases of errors here, empty channel and no more - // senders. In both cases we don't really care at this point. - if let Ok(packet) = from_routing_data.try_recv() { - if let Err(e) = connection.feed_data_packet(packet).await { + if let Err(e) = sink.feed_data_packet(packet).await { error!("Failed to feed data packet to connection: {e}"); break } - } - } - if let Err(e) = connection.flush().await { - error!("Failed to flush buffered peer connection data packets: {e}"); - break + + for _ in 1..PACKET_COALESCE_WINDOW { + // There can be 2 cases of errors here, empty channel and no more + // senders. In both cases we don't really care at this point. + if let Ok(packet) = from_routing_data.try_recv() { + if let Err(e) = sink.feed_data_packet(packet).await { + error!("Failed to feed data packet to connection: {e}"); + break + } + trace!("Instantly queued ready packet to transfer to peer"); + } else { + // no packets ready, flush currently buffered ones + break + } + } + } } } - Some(packet) = from_routing_control.recv() => { - if let Err(e) = connection.feed_control_packet(packet).await { - error!("Failed to feed control packet to connection: {e}"); - break - } + rv = from_routing_control.recv(), if !needs_flush => { + match rv { + None => break, + Some(packet) => { + needs_flush = true; - for _ in 1..PACKET_COALESCE_WINDOW { - // There can be 2 cases of errors here, empty channel and no more - // senders. In both cases we don't really care at this point. - if let Ok(packet) = from_routing_control.try_recv() { - if let Err(e) = connection.feed_control_packet(packet).await { - error!("Failed to feed data packet to connection: {e}"); + if let Err(e) = sink.feed_control_packet(packet).await { + error!("Failed to feed control packet to connection: {e}"); break } @@ -164,7 +172,7 @@ impl Peer { // There can be 2 cases of errors here, empty channel and no more // senders. In both cases we don't really care at this point. if let Ok(packet) = from_routing_control.try_recv() { - if let Err(e) = connection.feed_control_packet(packet).await { + if let Err(e) = sink.feed_control_packet(packet).await { error!("Failed to feed data packet to connection: {e}"); break } @@ -175,11 +183,14 @@ impl Peer { } } } + } - if let Err(e) = connection.flush().await { - error!("Failed to flush buffered peer connection control packets: {e}"); + r = sink.flush(), if needs_flush => { + if let Err(e) = r { + error!("Failed to flush buffered peer connection packets: {e}"); break } + needs_flush = false; } _ = death_watcher.notified() => { From 3ebcab657b6be8dafe68bb49fdb183d2624541b6 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 4 Dec 2025 17:35:48 +0100 Subject: [PATCH 16/17] Update CHANGELOG.md Signed-off-by: Lee Smet --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85728d69..e3ab9963 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 future, this will be expanded to redirect queries for certain TLD's to alternative backend. +### Changed + +- The Quic connection type now uses quic datagrams to transport __data__ (packets + coming from the TUN device) to the peer. Protocol traffic is still sent over a + bidirectional Quic stream (which supports retransmits). + ### Fixed - Return actuall amount of bytes sent to peers instead of the amount of bytes received From 7aa5b89ca1704e0519501441ebeef9fb91b8687a Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 5 Dec 2025 11:52:56 +0100 Subject: [PATCH 17/17] Restore linux tun interface MTU Signed-off-by: Lee Smet --- mycelium/src/tun/linux.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mycelium/src/tun/linux.rs b/mycelium/src/tun/linux.rs index fecafe89..4958a218 100644 --- a/mycelium/src/tun/linux.rs +++ b/mycelium/src/tun/linux.rs @@ -13,7 +13,7 @@ use crate::subnet::Subnet; use crate::tun::TunConfig; // TODO -const LINK_MTU: i32 = 1300; +const LINK_MTU: i32 = 1400; /// Create a new tun interface and set required routes ///