Skip to content

Commit 2b1c55c

Browse files
authored
feat: harden subscription routing (#2064)
1 parent a413514 commit 2b1c55c

File tree

11 files changed

+815
-135
lines changed

11 files changed

+815
-135
lines changed

apps/freenet-ping/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"]
44

55
[workspace.dependencies]
66
# freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] }
7-
freenet-stdlib = { version = "0.1.24" }
7+
freenet-stdlib = { version = "0.1.14" }
88
freenet-ping-types = { path = "types", default-features = false }
99
chrono = { version = "0.4", default-features = false }
1010
testresult = "0.4"

apps/freenet-ping/app/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"]
1010
anyhow = "1.0"
1111
chrono = { workspace = true, features = ["default"] }
1212
clap = { version = "4.5", features = ["derive"] }
13-
freenet-stdlib = { version = "0.1.24", features = ["net"] }
13+
freenet-stdlib = { version = "0.1.22", features = ["net"] }
1414
freenet-ping-types = { path = "../types", features = ["std", "clap"] }
1515
futures = "0.3.31"
1616
rand = "0.9.2"

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 107 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
sync::Arc,
1515
};
1616
use tokio::net::UdpSocket;
17-
use tokio::sync::mpsc::{self, Receiver, Sender};
17+
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
1818
use tokio::sync::oneshot::{self};
1919
use tokio::time::timeout;
2020
use tracing::Instrument;
@@ -928,6 +928,12 @@ impl P2pConnManager {
928928
op_manager: &Arc<OpManager>,
929929
state: &mut EventListenerState,
930930
) -> anyhow::Result<()> {
931+
let tx = *msg.id();
932+
tracing::debug!(
933+
%tx,
934+
tx_type = ?tx.transaction_type(),
935+
"Handling inbound NetMessage at event loop"
936+
);
931937
match msg {
932938
NetMessage::V1(NetMessageV1::Aborted(tx)) => {
933939
handle_aborted_op(tx, op_manager, &self.gateways).await?;
@@ -1277,6 +1283,7 @@ impl P2pConnManager {
12771283
Some(Ok(peer_conn)) => {
12781284
// Get the remote address from the connection
12791285
let remote_addr = peer_conn.conn.remote_addr();
1286+
let tx = *peer_conn.msg.id();
12801287

12811288
// Check if we need to establish a connection back to the sender
12821289
let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr)
@@ -1307,6 +1314,12 @@ impl P2pConnManager {
13071314
}
13081315
}
13091316

1317+
tracing::debug!(
1318+
peer_addr = %remote_addr,
1319+
%tx,
1320+
tx_type = ?tx.transaction_type(),
1321+
"Queueing inbound NetMessage from peer connection"
1322+
);
13101323
let task = peer_connection_listener(peer_conn.rx, peer_conn.conn).boxed();
13111324
select_stream.push_peer_connection(task);
13121325
Ok(EventResult::Event(
@@ -1587,45 +1600,116 @@ pub(super) struct PeerConnectionInbound {
15871600
pub msg: NetMessage,
15881601
}
15891602

1603+
async fn handle_peer_channel_message(
1604+
conn: &mut PeerConnection,
1605+
msg: Either<NetMessage, ConnEvent>,
1606+
) -> Result<(), TransportError> {
1607+
match msg {
1608+
Left(msg) => {
1609+
tracing::debug!(to=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}");
1610+
if let Err(error) = conn.send(msg).await {
1611+
tracing::error!(
1612+
to = %conn.remote_addr(),
1613+
?error,
1614+
"[CONN_LIFECYCLE] Failed to send message to peer"
1615+
);
1616+
return Err(error);
1617+
}
1618+
tracing::debug!(
1619+
to = %conn.remote_addr(),
1620+
"[CONN_LIFECYCLE] Message enqueued on transport socket"
1621+
);
1622+
}
1623+
Right(action) => {
1624+
tracing::debug!(to=%conn.remote_addr(), "Received action from channel");
1625+
match action {
1626+
ConnEvent::NodeAction(NodeEvent::DropConnection(peer)) => {
1627+
tracing::info!(
1628+
to = %conn.remote_addr(),
1629+
peer = %peer,
1630+
"[CONN_LIFECYCLE] Closing connection per DropConnection action"
1631+
);
1632+
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
1633+
}
1634+
ConnEvent::ClosedChannel(reason) => {
1635+
tracing::info!(
1636+
to = %conn.remote_addr(),
1637+
reason = ?reason,
1638+
"[CONN_LIFECYCLE] Closing connection due to ClosedChannel action"
1639+
);
1640+
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
1641+
}
1642+
other => {
1643+
unreachable!(
1644+
"Unexpected action from peer_connection_listener channel: {:?}",
1645+
other
1646+
);
1647+
}
1648+
}
1649+
}
1650+
}
1651+
Ok(())
1652+
}
1653+
15901654
async fn peer_connection_listener(
15911655
mut rx: PeerConnChannelRecv,
15921656
mut conn: PeerConnection,
15931657
) -> Result<PeerConnectionInbound, TransportError> {
1658+
const MAX_IMMEDIATE_SENDS: usize = 32;
15941659
loop {
1595-
tokio::select! {
1596-
msg = rx.recv() => {
1597-
let Some(msg) = msg else { break Err(TransportError::ConnectionClosed(conn.remote_addr())); };
1598-
match msg {
1599-
Left(msg) => {
1600-
tracing::debug!(to=%conn.remote_addr() ,"Sending message to peer. Msg: {msg}");
1601-
conn
1602-
.send(msg)
1603-
.await?;
1604-
}
1605-
Right(action) => {
1606-
tracing::debug!(to=%conn.remote_addr(), "Received action from channel");
1607-
match action {
1608-
ConnEvent::NodeAction(NodeEvent::DropConnection(_))
1609-
| ConnEvent::ClosedChannel(_) => {
1610-
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
1611-
}
1612-
other => {
1613-
unreachable!("Unexpected action from peer_connection_listener channel: {:?}", other);
1614-
}
1615-
}
1660+
let mut drained = 0;
1661+
loop {
1662+
match rx.try_recv() {
1663+
Ok(msg) => {
1664+
handle_peer_channel_message(&mut conn, msg).await?;
1665+
drained += 1;
1666+
if drained >= MAX_IMMEDIATE_SENDS {
1667+
break;
16161668
}
16171669
}
1670+
Err(TryRecvError::Empty) => break,
1671+
Err(TryRecvError::Disconnected) => {
1672+
tracing::warn!(
1673+
to = %conn.remote_addr(),
1674+
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
1675+
);
1676+
return Err(TransportError::ConnectionClosed(conn.remote_addr()));
1677+
}
1678+
}
1679+
}
1680+
1681+
tokio::select! {
1682+
msg = rx.recv() => {
1683+
let Some(msg) = msg else {
1684+
tracing::warn!(
1685+
to = %conn.remote_addr(),
1686+
"[CONN_LIFECYCLE] peer_connection_listener channel closed without explicit DropConnection"
1687+
);
1688+
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
1689+
};
1690+
handle_peer_channel_message(&mut conn, msg).await?;
16181691
}
16191692
msg = conn.recv() => {
16201693
let Ok(msg) = msg
16211694
.inspect_err(|error| {
16221695
tracing::error!(from=%conn.remote_addr(), "Error while receiving message: {error}");
16231696
})
16241697
else {
1698+
tracing::debug!(
1699+
from = %conn.remote_addr(),
1700+
"[CONN_LIFECYCLE] peer_connection_listener terminating after recv error"
1701+
);
16251702
break Err(TransportError::ConnectionClosed(conn.remote_addr()));
16261703
};
16271704
let net_message = decode_msg(&msg).unwrap();
1628-
tracing::debug!(from=%conn.remote_addr() ,"Received message from peer. Msg: {net_message}");
1705+
let tx = *net_message.id();
1706+
tracing::debug!(
1707+
from = %conn.remote_addr(),
1708+
%tx,
1709+
tx_type = ?tx.transaction_type(),
1710+
msg_type = %net_message,
1711+
"[CONN_LIFECYCLE] Received inbound NetMessage from peer"
1712+
);
16291713
break Ok(PeerConnectionInbound { conn, rx, msg: net_message });
16301714
}
16311715
}

0 commit comments

Comments
 (0)