diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 54a6139..1945bef 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -8,7 +8,6 @@ use std::{error::Error, sync::Arc}; use tokio::{ signal::unix::{SignalKind, signal}, - sync::broadcast, task::JoinHandle, }; use tokio_util::sync::CancellationToken; @@ -34,16 +33,14 @@ pub struct Server {} impl Server { pub async fn run(config: Config) -> Result<(), Box> { - let canceller = Server::wait_for_shutdown_signal(); - let resetter = Server::wait_for_reset_signal(canceller.clone()); + let shutdown_signal = Server::wait_for_shutdown_signal(); - Self::_run(config, canceller, resetter).await + Self::_run(config, shutdown_signal).await } async fn _run( config: Config, - canceller: CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, ) -> Result<(), Box> { // try to set system limits match rlimit::getrlimit(rlimit::Resource::NOFILE) { @@ -70,11 +67,11 @@ impl Server { // spawn metrics service let metrics = Arc::new(Metrics::new(config.metrics.clone())); { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); let metrics = metrics.clone(); tokio::spawn(async move { - metrics.run(canceller).await.inspect_err(|err| { + metrics.run(shutdown_signal).await.inspect_err(|err| { error!( service = Metrics::name(), error = ?err, @@ -85,28 +82,36 @@ impl Server { }); } - // spawn circuit-breaker - if !config.circuit_breaker.url.is_empty() { - let canceller = canceller.clone(); - let resetter = resetter.clone(); - - let _ = std::thread::spawn(move || { - let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() { - Ok(rt) => rt, - Err(err) => { - error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); - std::process::exit(-1); - } - }; + while !shutdown_signal.is_cancelled() { + let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); - let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone()); + // spawn circuit-breaker + if !config.circuit_breaker.url.is_empty() { + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); - tokio::task::LocalSet::new() - .block_on(&rt, async move { circuit_breaker.run(canceller, resetter).await }) - }); - } + let circuit_breaker_config = config.circuit_breaker.clone(); + + let _ = std::thread::spawn(move || { + let rt = match tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + { + Ok(rt) => rt, + Err(err) => { + error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker"); + std::process::exit(-1); + } + }; + + let circuit_breaker = CircuitBreaker::new(circuit_breaker_config); + + tokio::task::LocalSet::new().block_on(&rt, async move { + circuit_breaker.run(shutdown_signal, reset_signal).await + }) + }); + } - while !canceller.is_cancelled() { if config.tls.enabled() { let metrics = metrics.clone(); let (not_before, not_after) = @@ -122,16 +127,16 @@ impl Server { let tls = config.tls.clone(); let config = config.authrpc.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyHttp::::run( config, tls, metrics, - canceller.clone(), - resetter, + shutdown_signal.clone(), + reset_signal, ) .await .inspect_err(|err| { @@ -140,7 +145,7 @@ impl Server { error = ?err, "Failed to start http-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -150,16 +155,16 @@ impl Server { let tls = config.tls.clone(); let config = config.rpc.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyHttp::::run( config, tls, metrics, - canceller.clone(), - resetter, + shutdown_signal.clone(), + reset_signal, ) .await .inspect_err(|err| { @@ -168,7 +173,7 @@ impl Server { error = ?err, "Failed to start http-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -178,16 +183,16 @@ impl Server { let tls = config.tls.clone(); let config = config.flashblocks.clone(); let metrics = metrics.clone(); - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); + let reset_signal = reset_signal.clone(); services.push(tokio::spawn(async move { ProxyWs::::run( config, tls, metrics, - canceller.clone(), - resetter, + shutdown_signal.clone(), + reset_signal, ) .await .inspect_err(|err| { @@ -196,7 +201,7 @@ impl Server { error = ?err, "Failed to start websocket-proxy, terminating...", ); - canceller.cancel(); + shutdown_signal.cancel(); }) })); } @@ -212,10 +217,10 @@ impl Server { } fn wait_for_shutdown_signal() -> CancellationToken { - let canceller = tokio_util::sync::CancellationToken::new(); + let shutdown_signal = CancellationToken::new(); { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); tokio::spawn(async move { let sigint = async { @@ -239,43 +244,36 @@ impl Server { info!("Shutdown signal received, stopping..."); - canceller.cancel(); + shutdown_signal.cancel(); }); } - canceller + shutdown_signal } - fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> { - let (resetter, _) = broadcast::channel::<()>(1); - - { - let resetter = resetter.clone(); + fn wait_for_reset_signal(shutdown_signal: CancellationToken) -> CancellationToken { + let reset_signal = shutdown_signal.child_token(); - tokio::spawn(async move { + tokio::spawn({ + let reset_signal = reset_signal.clone(); + let shutdown_signal = shutdown_signal.clone(); + async move { let mut hangup = signal(SignalKind::hangup()).expect("failed to install sighup handler"); loop { tokio::select! { + _ = shutdown_signal.cancelled() => break, _ = hangup.recv() => { info!("Hangup signal received, resetting..."); - - if let Err(err) = resetter.send(()) { - error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy..."); - canceller.cancel(); - } + reset_signal.cancel(); } - - _ = canceller.cancelled() => { - return - }, } } - }); - } + } + }); - resetter + reset_signal } } @@ -291,7 +289,7 @@ mod tests { RpcModule, server::{ServerBuilder, ServerHandle}, }; - use tracing::{debug, info}; + use tracing::debug; use super::*; use crate::config::Config; @@ -340,19 +338,18 @@ mod tests { let proxy_addr_authrpc = cfg.clone().authrpc.listen_address; let proxy_addr_rpc = cfg.clone().rpc.listen_address; - let canceller = tokio_util::sync::CancellationToken::new(); - let resetter = Server::wait_for_reset_signal(canceller.clone()); + let shutdown_signal = CancellationToken::new(); + let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone()); let server = { - let canceller = canceller.clone(); - let resetter = resetter.clone(); + let shutdown_signal = shutdown_signal.clone(); - actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await }) + actix_rt::spawn(async move { Server::_run(cfg, shutdown_signal).await }) }; actix_rt::time::sleep(std::time::Duration::from_millis(100)).await; { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); let client = Client::builder().timeout(Duration::from_millis(10)).finish(); let proxy_addr_authrpc = proxy_addr_authrpc.clone(); @@ -380,7 +377,7 @@ mod tests { } } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } @@ -389,7 +386,7 @@ mod tests { } { - let canceller = canceller.clone(); + let shutdown_signal = shutdown_signal.clone(); let client = Client::builder().timeout(Duration::from_millis(10)).finish(); actix_rt::spawn(async move { @@ -416,7 +413,7 @@ mod tests { } } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } @@ -426,20 +423,8 @@ mod tests { let client = Client::builder().timeout(Duration::from_millis(10)).finish(); - for i in 0..10 { - match resetter.send(()) { - Err(err) => { - debug!(iteration = i, error = ?err, "Failed to send a reset"); - } - - Ok(proxies_count) => { - info!(iteration = i, proxies_count = proxies_count, "Sent a reset"); - assert_eq!( - proxies_count, 2, - "sent reset wrong count of proxies: {proxies_count} != 2" - ); - } - } + for _ in 0..10 { + reset_signal.cancel(); actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await; @@ -469,13 +454,13 @@ mod tests { } } - _ = canceller.cancelled() => { + _ = shutdown_signal.cancelled() => { break } } } - canceller.cancel(); + shutdown_signal.cancel(); tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok(); } diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index 638846d..61cd78b 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -19,6 +19,7 @@ use prometheus_client::{ registry::{Registry, Unit}, }; use socket2::{SockAddr, Socket, TcpKeepalive}; +use tokio_util::sync::CancellationToken; use tracing::{error, info}; use crate::server::{config::ConfigMetrics, metrics::candlestick::Candlestick}; @@ -281,7 +282,7 @@ impl Metrics { pub(crate) async fn run( self: Arc, - canceller: tokio_util::sync::CancellationToken, + shutdown_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = self.config.listen_address(); @@ -305,7 +306,7 @@ impl Metrics { .default_service(web::route().to(Self::receive)) }) .workers(1) - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .listen(listener) { Ok(metrics) => metrics, diff --git a/crates/rproxy/src/server/proxy/circuit_breaker.rs b/crates/rproxy/src/server/proxy/circuit_breaker.rs index d0959f7..1b377ed 100644 --- a/crates/rproxy/src/server/proxy/circuit_breaker.rs +++ b/crates/rproxy/src/server/proxy/circuit_breaker.rs @@ -6,8 +6,8 @@ use awc::{ http::{self, Method, header}, }; use parking_lot::Mutex; -use tokio::sync::broadcast; -use tracing::{debug, error, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, warn}; use crate::server::proxy::config::ConfigCircuitBreaker; @@ -77,26 +77,25 @@ impl CircuitBreaker { pub(crate) async fn run( self, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) { - let canceller = canceller.clone(); - let resetter = resetter.clone(); - let mut ticker = tokio::time::interval(self.config.poll_interval); ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); // spawning locally b/c actix client is thread-local by design if let Err(err) = tokio::task::spawn_local(async move { loop { - let resetter = resetter.clone(); - tokio::select! { _ = ticker.tick() => { - self.poll(resetter).await; + self.poll(reset_signal.clone()).await; + } + + _ = shutdown_signal.cancelled() => { + break } - _ = canceller.cancelled() => { + _ = reset_signal.cancelled() => { break } } @@ -112,7 +111,7 @@ impl CircuitBreaker { } } - async fn poll(&self, resetter: broadcast::Sender<()>) { + async fn poll(&self, reset_signal: CancellationToken) { let req = self .client .request(Method::GET, self.config.url.clone()) @@ -162,26 +161,12 @@ impl CircuitBreaker { this.curr_status = Status::Unhealthy; warn!(service = Self::name(), "Backend became unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } + reset_signal.cancel(); } (Status::Unhealthy, Status::Unhealthy) => { warn!(service = Self::name(), "Backend is still unhealthy, resetting..."); - - if let Err(err) = resetter.send(()) { - error!( - from = Self::name(), - error = ?err, - "Failed to broadcast reset signal", - ); - } + reset_signal.cancel(); } (Status::Unhealthy, Status::Healthy) => { diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..28f1e93 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -42,7 +42,7 @@ use futures_core::Stream; use pin_project::pin_project; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::sync::broadcast; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use url::Url; use uuid::Uuid; @@ -142,8 +142,8 @@ where config: C, tls: ConfigTls, metrics: Arc, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = config.listen_address(); @@ -187,7 +187,7 @@ where ); let server = actix_server::Server::build() - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .shutdown_timeout(shared.config().shutdown_timeout_sec()) .workers(workers_count); @@ -272,43 +272,20 @@ where let server = server.run(); let handler = server.handle(); - let mut resetter = resetter.subscribe(); tokio::spawn(async move { - loop { - match resetter.recv().await { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } + reset_signal.cancelled().await; - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - "Resetter channel is closed, stopping http-proxy..." - ); - } + info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); - } - } - - if let Err(err) = - tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await - { - error!( - proxy = P::name(), - error = ?err, - "Graceful shutdown of http-proxy failed after 1 minute, forcefully shutting down..." - ); - std::process::exit(1); - } - - break; + if let Err(err) = + tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await + { + error!( + proxy = P::name(), + error = ?err, + "Graceful shutdown of http-proxy failed after 1 minute, forcefully shutting down..." + ); + std::process::exit(1); } }); diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 53be231..5d76358 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -29,8 +29,9 @@ use futures::{ use prometheus_client::metrics::gauge::Atomic; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; -use tokio::{net::TcpStream, sync::broadcast}; +use tokio::net::TcpStream; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; +use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -72,8 +73,8 @@ where shared: ProxyWsSharedState, postprocessor: actix::Addr>, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, backend: ProxyWsBackendEndpoint, } @@ -85,8 +86,8 @@ where { fn new( shared: ProxyWsSharedState, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Self { let id = Uuid::now_v7(); @@ -102,7 +103,7 @@ where } .start(); - Self { id, shared, postprocessor, canceller, resetter, backend } + Self { id, shared, postprocessor, shutdown_signal, reset_signal, backend } } fn config(&self) -> &C { @@ -113,8 +114,8 @@ where config: C, tls: ConfigTls, metrics: Arc, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, ) -> Result<(), Box> { let listen_address = config.listen_address(); @@ -135,8 +136,8 @@ where let shared = ProxyWsSharedState::::new(config.clone(), &metrics); let client_connections_count = shared.client_connections_count.clone(); - let worker_canceller = canceller.clone(); - let worker_resetter = resetter.clone(); + let worker_shutdown_signal = shutdown_signal.clone(); + let worker_reset_signal = reset_signal.clone(); let shutdown_timeout_sec = shared.config().shutdown_timeout_sec(); info!( @@ -149,8 +150,8 @@ where let server = HttpServer::new(move || { let this = web::Data::new(Self::new( shared.clone(), - worker_canceller.clone(), - worker_resetter.clone(), + worker_shutdown_signal.clone(), + worker_reset_signal.clone(), )); App::new() @@ -165,7 +166,7 @@ where client_connections_count, config.keepalive_interval(), )) - .shutdown_signal(canceller.cancelled_owned()) + .shutdown_signal(shutdown_signal.cancelled_owned()) .shutdown_timeout(shutdown_timeout_sec) .workers(workers_count); @@ -192,46 +193,20 @@ where .run(); let handler = proxy.handle(); - let mut resetter = resetter.subscribe(); tokio::spawn(async move { - loop { - match resetter.recv().await { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } + reset_signal.cancelled().await; - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - "Resetter channel is closed, stopping websocket-proxy..." - ); - } - - Ok(()) => { - info!( - proxy = P::name(), - "Reset signal received, stopping websocket-proxy..." - ); - } - } + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - if let Err(err) = - tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await - { - error!( - proxy = P::name(), - error = ?err, - "Graceful shutdown of websocket-proxy failed after 1 minute, forcefully shutting down..." - ); - std::process::exit(1); - } - - break; + if let Err(err) = + tokio::time::timeout(Duration::from_millis(60_000), handler.stop(true)).await + { + error!( + proxy = P::name(), + error = ?err, + "Graceful shutdown of websocket-proxy failed after 1 minute, forcefully shutting down..." + ); + std::process::exit(1); } }); @@ -405,8 +380,8 @@ where worker_id: this.id, shared: this.shared.clone(), postprocessor: this.postprocessor.clone(), - canceller: this.canceller.clone(), - resetter: this.resetter.clone(), + shutdown_signal: this.shutdown_signal.clone(), + reset_signal: this.reset_signal.clone(), clnt_tx, clnt_rx, bknd_tx: Some(bknd_tx), @@ -694,8 +669,8 @@ where shared: ProxyWsSharedState, postprocessor: actix::Addr>, - canceller: tokio_util::sync::CancellationToken, - resetter: broadcast::Sender<()>, + shutdown_signal: CancellationToken, + reset_signal: CancellationToken, clnt_tx: Session, clnt_rx: MessageStream, @@ -728,45 +703,21 @@ where let mut pumping: Result<(), &str> = Ok(()); - let mut resetter = self.resetter.subscribe(); - - while pumping.is_ok() && !self.canceller.is_cancelled() && !resetter.is_closed() { + while pumping.is_ok() && + !self.shutdown_signal.is_cancelled() && + !self.reset_signal.is_cancelled() + { #[cfg(feature = "chaos")] if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { tokio::select! { - _ = self.canceller.cancelled() => { + _ = self.shutdown_signal.cancelled() => { break; } - reset = resetter.recv() => { - match reset { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } - - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Resetter channel is closed, stopping websocket-proxy..." - ); - break; - } - - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - break; - } - } - } + _ = self.reset_signal.cancelled() => { + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); + break; + }, // client => backend clnt_msg = self.clnt_rx.next() => { @@ -777,39 +728,14 @@ where } tokio::select! { - _ = self.canceller.cancelled() => { + _ = self.shutdown_signal.cancelled() => { break; } - reset = resetter.recv() => { - match reset { - Err(broadcast::error::RecvError::Lagged(lag)) => { - warn!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - lag = lag, - "Resetter channel is lagging behind, attempting to exhaust it..." - ); - continue; - } - - Err(broadcast::error::RecvError::Closed) => { - info!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - "Resetter channel is closed, stopping websocket-proxy..." - ); - break; - } - - Ok(()) => { - info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); - break; - } - } - } + _ = self.reset_signal.cancelled() => { + info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); + break; + }, // ping both sides _ = heartbeat.tick() => {