Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 75 additions & 90 deletions crates/rproxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{error::Error, sync::Arc};

use tokio::{
signal::unix::{SignalKind, signal},
sync::broadcast,
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;
Expand All @@ -34,16 +33,14 @@ pub struct Server {}

impl Server {
pub async fn run(config: Config) -> Result<(), Box<dyn std::error::Error + Send>> {
let canceller = Server::wait_for_shutdown_signal();
let resetter = Server::wait_for_reset_signal(canceller.clone());
let shutdown_signal = Server::wait_for_shutdown_signal();

Self::_run(config, canceller, resetter).await
Self::_run(config, shutdown_signal).await
}

async fn _run(
config: Config,
canceller: CancellationToken,
resetter: broadcast::Sender<()>,
shutdown_signal: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send>> {
// try to set system limits
match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Expand All @@ -70,11 +67,11 @@ impl Server {
// spawn metrics service
let metrics = Arc::new(Metrics::new(config.metrics.clone()));
{
let canceller = canceller.clone();
let shutdown_signal = shutdown_signal.clone();
let metrics = metrics.clone();

tokio::spawn(async move {
metrics.run(canceller).await.inspect_err(|err| {
metrics.run(shutdown_signal).await.inspect_err(|err| {
error!(
service = Metrics::name(),
error = ?err,
Expand All @@ -85,28 +82,36 @@ impl Server {
});
}

// spawn circuit-breaker
if !config.circuit_breaker.url.is_empty() {
let canceller = canceller.clone();
let resetter = resetter.clone();

let _ = std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread().enable_all().build() {
Ok(rt) => rt,
Err(err) => {
error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker");
std::process::exit(-1);
}
};
while !shutdown_signal.is_cancelled() {
let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone());

let circuit_breaker = CircuitBreaker::new(config.circuit_breaker.clone());
// spawn circuit-breaker
if !config.circuit_breaker.url.is_empty() {
let shutdown_signal = shutdown_signal.clone();
let reset_signal = reset_signal.clone();

tokio::task::LocalSet::new()
.block_on(&rt, async move { circuit_breaker.run(canceller, resetter).await })
});
}
let circuit_breaker_config = config.circuit_breaker.clone();

let _ = std::thread::spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => {
error!(error = ?err, "Failed to initialise a single-threaded runtime for circuit-breaker");
std::process::exit(-1);
}
};

let circuit_breaker = CircuitBreaker::new(circuit_breaker_config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue:

this change makes circuit-breaker being dropped each time it detects backend unhealthy, which breaks its logic around threshold_healtlhy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which part of the logic seems broken to you? The circuit breaker restarts along with the proxies so it seemed fine to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not fine


tokio::task::LocalSet::new().block_on(&rt, async move {
circuit_breaker.run(shutdown_signal, reset_signal).await
})
});
}

while !canceller.is_cancelled() {
if config.tls.enabled() {
let metrics = metrics.clone();
let (not_before, not_after) =
Expand All @@ -122,16 +127,16 @@ impl Server {
let tls = config.tls.clone();
let config = config.authrpc.clone();
let metrics = metrics.clone();
let canceller = canceller.clone();
let resetter = resetter.clone();
let shutdown_signal = shutdown_signal.clone();
let reset_signal = reset_signal.clone();

services.push(tokio::spawn(async move {
ProxyHttp::<ConfigAuthrpc, ProxyHttpInnerAuthrpc>::run(
config,
tls,
metrics,
canceller.clone(),
resetter,
shutdown_signal.clone(),
reset_signal,
)
.await
.inspect_err(|err| {
Expand All @@ -140,7 +145,7 @@ impl Server {
error = ?err,
"Failed to start http-proxy, terminating...",
);
canceller.cancel();
shutdown_signal.cancel();
})
}));
}
Expand All @@ -150,16 +155,16 @@ impl Server {
let tls = config.tls.clone();
let config = config.rpc.clone();
let metrics = metrics.clone();
let canceller = canceller.clone();
let resetter = resetter.clone();
let shutdown_signal = shutdown_signal.clone();
let reset_signal = reset_signal.clone();

services.push(tokio::spawn(async move {
ProxyHttp::<ConfigRpc, ProxyHttpInnerRpc>::run(
config,
tls,
metrics,
canceller.clone(),
resetter,
shutdown_signal.clone(),
reset_signal,
)
.await
.inspect_err(|err| {
Expand All @@ -168,7 +173,7 @@ impl Server {
error = ?err,
"Failed to start http-proxy, terminating...",
);
canceller.cancel();
shutdown_signal.cancel();
})
}));
}
Expand All @@ -178,16 +183,16 @@ impl Server {
let tls = config.tls.clone();
let config = config.flashblocks.clone();
let metrics = metrics.clone();
let canceller = canceller.clone();
let resetter = resetter.clone();
let shutdown_signal = shutdown_signal.clone();
let reset_signal = reset_signal.clone();

services.push(tokio::spawn(async move {
ProxyWs::<ConfigFlashblocks, ProxyWsInnerFlashblocks>::run(
config,
tls,
metrics,
canceller.clone(),
resetter,
shutdown_signal.clone(),
reset_signal,
)
.await
.inspect_err(|err| {
Expand All @@ -196,7 +201,7 @@ impl Server {
error = ?err,
"Failed to start websocket-proxy, terminating...",
);
canceller.cancel();
shutdown_signal.cancel();
})
}));
}
Expand All @@ -212,10 +217,10 @@ impl Server {
}

fn wait_for_shutdown_signal() -> CancellationToken {
let canceller = tokio_util::sync::CancellationToken::new();
let shutdown_signal = CancellationToken::new();

{
let canceller = canceller.clone();
let shutdown_signal = shutdown_signal.clone();

tokio::spawn(async move {
let sigint = async {
Expand All @@ -239,43 +244,36 @@ impl Server {

info!("Shutdown signal received, stopping...");

canceller.cancel();
shutdown_signal.cancel();
});
}

canceller
shutdown_signal
}

fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> {
let (resetter, _) = broadcast::channel::<()>(1);

{
let resetter = resetter.clone();
fn wait_for_reset_signal(shutdown_signal: CancellationToken) -> CancellationToken {
let reset_signal = shutdown_signal.child_token();

tokio::spawn(async move {
tokio::spawn({
let reset_signal = reset_signal.clone();
let shutdown_signal = shutdown_signal.clone();
async move {
let mut hangup =
signal(SignalKind::hangup()).expect("failed to install sighup handler");

loop {
tokio::select! {
_ = shutdown_signal.cancelled() => break,
_ = hangup.recv() => {
info!("Hangup signal received, resetting...");

if let Err(err) = resetter.send(()) {
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy...");
canceller.cancel();
}
reset_signal.cancel();
}

_ = canceller.cancelled() => {
return
},
}
}
});
}
}
});

resetter
reset_signal
}
}

Expand All @@ -291,7 +289,7 @@ mod tests {
RpcModule,
server::{ServerBuilder, ServerHandle},
};
use tracing::{debug, info};
use tracing::debug;

use super::*;
use crate::config::Config;
Expand Down Expand Up @@ -340,19 +338,18 @@ mod tests {
let proxy_addr_authrpc = cfg.clone().authrpc.listen_address;
let proxy_addr_rpc = cfg.clone().rpc.listen_address;

let canceller = tokio_util::sync::CancellationToken::new();
let resetter = Server::wait_for_reset_signal(canceller.clone());
let shutdown_signal = CancellationToken::new();
let reset_signal = Server::wait_for_reset_signal(shutdown_signal.clone());

let server = {
let canceller = canceller.clone();
let resetter = resetter.clone();
let shutdown_signal = shutdown_signal.clone();

actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await })
actix_rt::spawn(async move { Server::_run(cfg, shutdown_signal).await })
};
actix_rt::time::sleep(std::time::Duration::from_millis(100)).await;

{
let canceller = canceller.clone();
let shutdown_signal = shutdown_signal.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();
let proxy_addr_authrpc = proxy_addr_authrpc.clone();

Expand Down Expand Up @@ -380,7 +377,7 @@ mod tests {
}
}

_ = canceller.cancelled() => {
_ = shutdown_signal.cancelled() => {
break
}
}
Expand All @@ -389,7 +386,7 @@ mod tests {
}

{
let canceller = canceller.clone();
let shutdown_signal = shutdown_signal.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();

actix_rt::spawn(async move {
Expand All @@ -416,7 +413,7 @@ mod tests {
}
}

_ = canceller.cancelled() => {
_ = shutdown_signal.cancelled() => {
break
}
}
Expand All @@ -426,20 +423,8 @@ mod tests {

let client = Client::builder().timeout(Duration::from_millis(10)).finish();

for i in 0..10 {
match resetter.send(()) {
Err(err) => {
debug!(iteration = i, error = ?err, "Failed to send a reset");
}

Ok(proxies_count) => {
info!(iteration = i, proxies_count = proxies_count, "Sent a reset");
assert_eq!(
proxies_count, 2,
"sent reset wrong count of proxies: {proxies_count} != 2"
);
}
}
for _ in 0..10 {
reset_signal.cancel();

actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await;

Expand Down Expand Up @@ -469,13 +454,13 @@ mod tests {
}
}

_ = canceller.cancelled() => {
_ = shutdown_signal.cancelled() => {
break
}
}
}

canceller.cancel();
shutdown_signal.cancel();

tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok();
}
Expand Down
5 changes: 3 additions & 2 deletions crates/rproxy/src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use prometheus_client::{
registry::{Registry, Unit},
};
use socket2::{SockAddr, Socket, TcpKeepalive};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

use crate::server::{config::ConfigMetrics, metrics::candlestick::Candlestick};
Expand Down Expand Up @@ -281,7 +282,7 @@ impl Metrics {

pub(crate) async fn run(
self: Arc<Self>,
canceller: tokio_util::sync::CancellationToken,
shutdown_signal: CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let listen_address = self.config.listen_address();

Expand All @@ -305,7 +306,7 @@ impl Metrics {
.default_service(web::route().to(Self::receive))
})
.workers(1)
.shutdown_signal(canceller.cancelled_owned())
.shutdown_signal(shutdown_signal.cancelled_owned())
.listen(listener)
{
Ok(metrics) => metrics,
Expand Down
Loading