Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 64 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions apps/freenet-ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ members = ["contracts/ping", "app", "types"]

[workspace.dependencies]
# freenet-stdlib = { path = "./../../stdlib/rust", features = ["contract"] }
freenet-stdlib = { version = "0.1.14" }
freenet-stdlib = { version = "0.1.24" }
freenet-ping-types = { path = "types", default-features = false }
chrono = { version = "0.4", default-features = false }
testresult = "0.4"
Expand All @@ -19,4 +19,3 @@ debug = false
codegen-units = 1
panic = 'abort'
strip = true

2 changes: 1 addition & 1 deletion apps/freenet-ping/app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ testing = ["freenet-stdlib/testing", "freenet/testing"]
anyhow = "1.0"
chrono = { workspace = true, features = ["default"] }
clap = { version = "4.5", features = ["derive"] }
freenet-stdlib = { version = "0.1.22", features = ["net"] }
freenet-stdlib = { version = "0.1.24", features = ["net"] }
freenet-ping-types = { path = "../types", features = ["std", "clap"] }
futures = "0.3.31"
rand = "0.9.2"
Expand Down
40 changes: 0 additions & 40 deletions crates/core/src/client_events/session_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,7 @@ use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::debug;

/// Time-to-live for cached pending results. Entries older than this duration are
/// eligible for removal during pruning (triggered on message processing).
///
/// Note: Due to lazy evaluation, stale entries may persist beyond TTL during idle periods.
const PENDING_RESULT_TTL: Duration = Duration::from_secs(60);

/// Maximum number of cached pending results. When this limit is reached, LRU eviction
/// removes the oldest entry to make room for new ones.
///
/// Note: Cache may temporarily exceed this limit between messages since enforcement
/// is lazy (triggered only during message processing).
const MAX_PENDING_RESULTS: usize = 2048;

/// Simple session actor for client connection refactor
Expand Down Expand Up @@ -289,7 +279,6 @@ impl SessionActor {
.or_insert_with(|| PendingResult::new(result.clone()));
entry.result = result.clone();
entry.touch();

if let Some(waiting_clients) = self.client_transactions.remove(&tx) {
for client_id in waiting_clients {
if entry.delivered_clients.insert(client_id) {
Expand Down Expand Up @@ -400,23 +389,6 @@ impl SessionActor {
self.client_request_ids.retain(|(_, c), _| *c != client_id);
}

/// Prune stale pending results based on TTL and enforce capacity limits.
///
/// This is the **only** cache cleanup mechanism - there is no background task.
/// Called on every message in `process_message()`.
///
/// # Cleanup Strategy (Lazy Evaluation)
///
/// 1. **Skip if empty**: Early return if no cached results
/// 2. **Identify active transactions**: Collect all transactions that still have waiting clients
/// 3. **TTL-based removal**: Remove inactive entries older than `PENDING_RESULT_TTL`
/// 4. **Capacity enforcement**: If still at/over `MAX_PENDING_RESULTS`, trigger LRU eviction
///
/// # Lazy Evaluation Implications
///
/// - During idle periods (no messages), stale entries persist in memory
/// - Cache cleanup happens only when actor receives messages
/// - Stale entries may remain beyond TTL until next message arrives
fn prune_pending_results(&mut self) {
if self.pending_results.is_empty() {
return;
Expand Down Expand Up @@ -460,18 +432,6 @@ impl SessionActor {
}
}

/// Enforce capacity limits using LRU (Least Recently Used) eviction.
///
/// Removes the entry with the oldest `last_accessed` timestamp when the cache
/// reaches or exceeds `MAX_PENDING_RESULTS`.
///
/// # Lazy Evaluation Note
///
/// This is only called:
/// 1. At the end of `prune_pending_results()` if still at capacity
/// 2. Before inserting new entries when already at capacity
///
/// Between messages, cache size may temporarily exceed the limit.
fn enforce_pending_capacity(&mut self) {
if self.pending_results.len() < MAX_PENDING_RESULTS {
return;
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ pub(crate) enum NodeEvent {
/// Register expectation for an inbound connection from the given peer.
ExpectPeerConnection {
peer: PeerId,
courtesy: bool,
},
}

Expand Down Expand Up @@ -444,8 +445,11 @@ impl Display for NodeEvent {
"Local subscribe complete (tx: {tx}, key: {key}, subscribed: {subscribed})"
)
}
NodeEvent::ExpectPeerConnection { peer } => {
write!(f, "ExpectPeerConnection (from {peer})")
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
write!(
f,
"ExpectPeerConnection (from {peer}, courtesy: {courtesy})"
)
}
}
}
Expand Down
137 changes: 120 additions & 17 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use std::{
sync::Arc,
};
use tokio::net::UdpSocket;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver, Sender};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::time::timeout;
use tracing::Instrument;

Expand Down Expand Up @@ -629,14 +630,14 @@ impl P2pConnManager {
)
.await?;
}
NodeEvent::ExpectPeerConnection { peer } => {
tracing::debug!(%peer, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
tracing::debug!(%peer, courtesy, "ExpectPeerConnection event received; registering inbound expectation via handshake driver");
state.outbound_handler.expect_incoming(peer.addr);
if let Err(error) = handshake_cmd_sender
.send(HandshakeCommand::ExpectInbound {
peer: peer.clone(),
transaction: None,
courtesy: false,
courtesy,
})
.await
{
Expand Down Expand Up @@ -750,8 +751,12 @@ impl P2pConnManager {

// Collect node information
if config.include_node_info {
// Calculate location and adress if is set
let (addr, location) = if let Some(peer_id) =
// Prefer the runtime's current ring location; fall back to derivation from the peer's
// advertised address if we don't have one yet.
let current_location =
op_manager.ring.connection_manager.own_location().location;

let (addr, fallback_location) = if let Some(peer_id) =
op_manager.ring.connection_manager.get_peer_key()
{
let location = Location::from_address(&peer_id.addr);
Expand All @@ -760,11 +765,15 @@ impl P2pConnManager {
(None, None)
};

let location_str = current_location
.or(fallback_location)
.map(|loc| format!("{:.6}", loc.as_f64()));

// Always include basic node info, but only include address/location if available
response.node_info = Some(NodeInfo {
peer_id: ctx.key_pair.public().to_string(),
is_gateway: self.is_gateway,
location: location.map(|loc| format!("{:.6}", loc.0)),
location: location_str,
listening_address: addr
.map(|peer_addr| peer_addr.to_string()),
uptime_seconds: 0, // TODO: implement actual uptime tracking
Expand Down Expand Up @@ -1258,6 +1267,12 @@ impl P2pConnManager {
"connect_peer: registered new pending connection"
);
state.outbound_handler.expect_incoming(peer_addr);
let loc_hint = Location::from_address(&peer.addr);
self.bridge
.op_manager
.ring
.connection_manager
.register_outbound_pending(&peer, Some(loc_hint));
}
}

Expand Down Expand Up @@ -1350,6 +1365,7 @@ impl P2pConnManager {
}
}

let mut derived_courtesy = courtesy;
let peer_id = peer.unwrap_or_else(|| {
tracing::info!(
remote = %remote_addr,
Expand All @@ -1369,15 +1385,31 @@ impl P2pConnManager {
)
});

if !derived_courtesy {
derived_courtesy = self
.bridge
.op_manager
.ring
.connection_manager
.take_pending_courtesy_by_addr(&remote_addr);
}

tracing::info!(
remote = %peer_id.addr,
courtesy,
courtesy = derived_courtesy,
transaction = ?transaction,
"Inbound connection established"
);

self.handle_successful_connection(peer_id, connection, state, select_stream, None)
.await?;
self.handle_successful_connection(
peer_id,
connection,
state,
select_stream,
None,
derived_courtesy,
)
.await?;
}
HandshakeEvent::OutboundEstablished {
transaction,
Expand All @@ -1391,8 +1423,15 @@ impl P2pConnManager {
transaction = %transaction,
"Outbound connection established"
);
self.handle_successful_connection(peer, connection, state, select_stream, None)
.await?;
self.handle_successful_connection(
peer,
connection,
state,
select_stream,
None,
courtesy,
)
.await?;
}
HandshakeEvent::OutboundFailed {
transaction,
Expand Down Expand Up @@ -1507,6 +1546,7 @@ impl P2pConnManager {
state: &mut EventListenerState,
select_stream: &mut priority_select::ProductionPrioritySelectStream,
remaining_checks: Option<usize>,
courtesy: bool,
) -> anyhow::Result<()> {
let pending_txs = state
.awaiting_connection_txs
Expand Down Expand Up @@ -1582,18 +1622,41 @@ impl P2pConnManager {
}

if newly_inserted {
let pending_loc = self
let loc = self
.bridge
.op_manager
.ring
.connection_manager
.prune_in_transit_connection(&peer_id);
let loc = pending_loc.unwrap_or_else(|| Location::from_address(&peer_id.addr));
self.bridge
.pending_location_hint(&peer_id)
.unwrap_or_else(|| Location::from_address(&peer_id.addr));
let eviction_candidate = self
.bridge
.op_manager
.ring
.add_connection(loc, peer_id.clone(), false)
.add_connection(loc, peer_id.clone(), false, courtesy)
.await;
if let Some(victim) = eviction_candidate {
if victim == peer_id {
tracing::debug!(
%peer_id,
"Courtesy eviction candidate matched current connection; skipping drop"
);
} else {
tracing::info!(
%victim,
%peer_id,
courtesy_limit = true,
"Courtesy connection budget exceeded; dropping oldest courtesy peer"
);
if let Err(error) = self.bridge.drop_connection(&victim).await {
tracing::warn!(
%victim,
?error,
"Failed to drop courtesy connection after hitting budget"
);
}
}
}
}
Ok(())
}
Expand Down Expand Up @@ -1651,6 +1714,46 @@ impl P2pConnManager {
}
}

if let Some(sender_peer) = extract_sender_from_message(&peer_conn.msg) {
if sender_peer.peer.addr == remote_addr
|| sender_peer.peer.addr.ip().is_unspecified()
{
let mut new_peer_id = sender_peer.peer.clone();
if new_peer_id.addr.ip().is_unspecified() {
new_peer_id.addr = remote_addr;
if let Some(sender_mut) =
extract_sender_from_message_mut(&mut peer_conn.msg)
{
if sender_mut.peer.addr.ip().is_unspecified() {
sender_mut.peer.addr = remote_addr;
}
}
}
if let Some(existing_key) = self
.connections
.keys()
.find(|peer| {
peer.addr == remote_addr && peer.pub_key != new_peer_id.pub_key
})
.cloned()
{
if let Some(channel) = self.connections.remove(&existing_key) {
tracing::info!(
remote = %remote_addr,
old_peer = %existing_key,
new_peer = %new_peer_id,
"Updating provisional peer identity after inbound message"
);
self.bridge
.op_manager
.ring
.update_connection_identity(&existing_key, new_peer_id.clone());
self.connections.insert(new_peer_id, channel);
}
}
}
}

// Check if we need to establish a connection back to the sender
let should_connect = !self.connections.keys().any(|peer| peer.addr == remote_addr)
&& !state.awaiting_connection.contains_key(&remote_addr);
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/node/testing_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -940,8 +940,8 @@ where
NodeEvent::QueryNodeDiagnostics { .. } => {
unimplemented!()
}
NodeEvent::ExpectPeerConnection { peer } => {
tracing::debug!(%peer, "ExpectPeerConnection ignored in testing impl");
NodeEvent::ExpectPeerConnection { peer, courtesy } => {
tracing::debug!(%peer, courtesy, "ExpectPeerConnection ignored in testing impl");
continue;
}
},
Expand Down
Loading
Loading