@@ -45,6 +45,9 @@ use crate::{
4545mod guarded_channel;
4646mod path_state;
4747
48+ #[ cfg( feature = "metrics" ) ]
49+ mod metrics;
50+
4851// TODO: use this
4952// /// Number of addresses that are not active that we keep around per endpoint.
5053// ///
@@ -83,6 +86,9 @@ const APPLICATION_ABANDON_PATH: u8 = 30;
8386/// in a high frequency, and to keep data about previous path around for subsequent connections.
8487const ACTOR_MAX_IDLE_TIMEOUT : Duration = Duration :: from_secs ( 60 ) ;
8588
89+ /// Interval in which connection and path metrics are emitted.
90+ const METRICS_INTERVAL : Duration = Duration :: from_secs ( 10 ) ;
91+
8692/// A stream of events from all paths for all connections.
8793///
8894/// The connection is identified using [`ConnId`]. The event `Err` variant happens when the
@@ -240,8 +246,12 @@ impl RemoteStateActor {
240246 /// discipline is needed to not turn pending for a long time.
241247 async fn run ( mut self , mut inbox : GuardedReceiver < RemoteStateMessage > ) {
242248 trace ! ( "actor started" ) ;
249+
243250 let idle_timeout = time:: sleep ( ACTOR_MAX_IDLE_TIMEOUT ) ;
244251 n0_future:: pin!( idle_timeout) ;
252+
253+ let mut metrics_interval = time:: interval ( METRICS_INTERVAL ) ;
254+
245255 loop {
246256 let scheduled_path_open = match self . scheduled_open_path {
247257 Some ( when) => MaybeFuture :: Some ( time:: sleep_until ( when) ) ,
@@ -296,6 +306,9 @@ impl RemoteStateActor {
296306 item = self . discovery_stream. next( ) => {
297307 self . handle_discovery_item( item) ;
298308 }
309+ _ = metrics_interval. tick( ) => {
310+ self . record_metrics( ) ;
311+ }
299312 _ = & mut idle_timeout => {
300313 if self . connections. is_empty( ) && inbox. close_if_idle( ) {
301314 trace!( "idle timeout expired and still idle: terminate actor" ) ;
@@ -430,7 +443,8 @@ impl RemoteStateActor {
430443 paths : Default :: default ( ) ,
431444 open_paths : Default :: default ( ) ,
432445 path_ids : Default :: default ( ) ,
433- transport_summary : TransportSummary :: default ( ) ,
446+ #[ cfg( feature = "metrics" ) ]
447+ metrics : Default :: default ( ) ,
434448 } )
435449 . into_mut ( ) ;
436450
@@ -589,8 +603,10 @@ impl RemoteStateActor {
589603
590604 fn handle_connection_close ( & mut self , conn_id : ConnId ) {
591605 if let Some ( state) = self . connections . remove ( & conn_id) {
592- self . metrics . num_conns_closed . inc ( ) ;
593- state. transport_summary . record ( & self . metrics ) ;
606+ #[ cfg( feature = "metrics" ) ]
607+ state. metrics . record_closed ( & self . metrics ) ;
608+ #[ cfg( not( feature = "metrics" ) ) ]
609+ let _ = state;
594610 }
595611 if self . connections . is_empty ( ) {
596612 trace ! ( "last connection closed - clearing selected_path" ) ;
@@ -1066,6 +1082,13 @@ impl RemoteStateActor {
10661082 }
10671083 }
10681084 }
1085+
1086+ fn record_metrics ( & mut self ) {
1087+ #[ cfg( feature = "metrics" ) ]
1088+ for state in self . connections . values_mut ( ) {
1089+ state. record_metrics_periodic ( & self . metrics , self . selected_path . get ( ) ) ;
1090+ }
1091+ }
10691092}
10701093
10711094/// Messages to send to the [`RemoteStateActor`].
@@ -1174,8 +1197,11 @@ struct ConnectionState {
11741197 open_paths : FxHashMap < PathId , transports:: Addr > ,
11751198 /// Reverse map of [`Self::paths].
11761199 path_ids : FxHashMap < transports:: Addr , PathId > ,
1177- /// Summary over transports used in this connection, for metrics tracking.
1178- transport_summary : TransportSummary ,
1200+ /// Tracker for stateful metrics for this connection and its paths
1201+ ///
1202+ /// Feature-gated on the `metrics` feature because it increases memory use.
1203+ #[ cfg( feature = "metrics" ) ]
1204+ metrics : self :: metrics:: MetricsTracker ,
11791205}
11801206
11811207impl ConnectionState {
@@ -1187,7 +1213,8 @@ impl ConnectionState {
11871213
11881214 /// Tracks an open path for the connection.
11891215 fn add_open_path ( & mut self , remote : transports:: Addr , path_id : PathId ) {
1190- self . transport_summary . add_path ( & remote) ;
1216+ #[ cfg( feature = "metrics" ) ]
1217+ self . metrics . add_path ( path_id, & remote) ;
11911218 self . paths . insert ( path_id, remote. clone ( ) ) ;
11921219 self . open_paths . insert ( path_id, remote. clone ( ) ) ;
11931220 self . path_ids . insert ( remote, path_id) ;
@@ -1200,11 +1227,15 @@ impl ConnectionState {
12001227 self . path_ids . remove ( & addr) ;
12011228 }
12021229 self . open_paths . remove ( path_id) ;
1230+ #[ cfg( feature = "metrics" ) ]
1231+ self . metrics . remove_path ( path_id) ;
12031232 }
12041233
12051234 /// Removes the path from the open paths.
12061235 fn remove_open_path ( & mut self , path_id : & PathId ) {
12071236 self . open_paths . remove ( path_id) ;
1237+ #[ cfg( feature = "metrics" ) ]
1238+ self . metrics . remove_path ( path_id) ;
12081239
12091240 self . update_pub_path_info ( ) ;
12101241 }
@@ -1224,6 +1255,19 @@ impl ConnectionState {
12241255
12251256 self . pub_open_paths . set ( new) . ok ( ) ;
12261257 }
1258+
1259+ #[ cfg( feature = "metrics" ) ]
1260+ fn record_metrics_periodic (
1261+ & mut self ,
1262+ metrics : & MagicsockMetrics ,
1263+ selected_path : Option < transports:: Addr > ,
1264+ ) {
1265+ let Some ( conn) = self . handle . upgrade ( ) else {
1266+ return ;
1267+ } ;
1268+ self . metrics
1269+ . record_periodic ( metrics, & conn, & self . open_paths , selected_path) ;
1270+ }
12271271}
12281272
12291273/// Watcher for the open paths and selected transmission path in a connection.
@@ -1435,44 +1479,6 @@ impl Future for OnClosed {
14351479 }
14361480}
14371481
1438- /// Used for metrics tracking.
1439- #[ derive( Debug , Clone , Copy , Default ) ]
1440- enum TransportSummary {
1441- #[ default]
1442- None ,
1443- IpOnly ,
1444- RelayOnly ,
1445- IpAndRelay ,
1446- }
1447-
1448- impl TransportSummary {
1449- fn add_path ( & mut self , addr : & transports:: Addr ) {
1450- use transports:: Addr ;
1451- * self = match ( * self , addr) {
1452- ( TransportSummary :: None | TransportSummary :: IpOnly , Addr :: Ip ( _) ) => Self :: IpOnly ,
1453- ( TransportSummary :: None | TransportSummary :: RelayOnly , Addr :: Relay ( _, _) ) => {
1454- Self :: RelayOnly
1455- }
1456- _ => Self :: IpAndRelay ,
1457- }
1458- }
1459-
1460- fn record ( & self , metrics : & MagicsockMetrics ) {
1461- match self {
1462- TransportSummary :: IpOnly => {
1463- metrics. num_conns_transport_ip_only . inc ( ) ;
1464- }
1465- TransportSummary :: RelayOnly => {
1466- metrics. num_conns_transport_relay_only . inc ( ) ;
1467- }
1468- TransportSummary :: IpAndRelay => {
1469- metrics. num_conns_transport_ip_and_relay . inc ( ) ;
1470- }
1471- TransportSummary :: None => { }
1472- }
1473- }
1474- }
1475-
14761482/// Converts an iterator of [`TransportAddr'] into an iterator of [`transports::Addr`].
14771483fn to_transports_addr (
14781484 endpoint_id : EndpointId ,
0 commit comments