Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,078 changes: 663 additions & 415 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion crates/rproxy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rproxy"
version = "0.0.10"
version = "0.0.11"
edition = "2024"
default-run = "rproxy"

Expand Down Expand Up @@ -70,3 +70,8 @@ uuid = { version = "1.18.1", features = ["v7" ]}
valuable = { version = "0.1.1", features = ["derive"] }
x509-parser = "0.18.0"
zstd = "0.13.3"

[dev-dependencies]
actix-rt = "2.11.0"
jsonrpsee = { version = "0.26.0", features = ["server"] }
mime = "0.3.17"
54 changes: 53 additions & 1 deletion crates/rproxy/src/jrpc/jrpc_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const JRPC_METHOD_FCUV3_WITH_PAYLOAD: Cow<'static, str> =

const EMPTY_PARAMS: &Vec<serde_json::Value> = &Vec::new();

#[derive(Debug)]
pub(crate) struct JrpcRequestMeta {
id: Id,

Expand Down Expand Up @@ -54,6 +55,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {
struct JrpcRequestMetaWire {
id: Id,
method: Cow<'static, str>,
#[serde(default)]
params: serde_json::Value,
}

Expand Down Expand Up @@ -93,7 +95,7 @@ impl<'a> Deserialize<'a> for JrpcRequestMeta {

const JRPC_METHOD_BATCH: Cow<'static, str> = Cow::Borrowed("batch");

#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub(crate) enum JrpcRequestMetaMaybeBatch {
Single(JrpcRequestMeta),
Expand All @@ -108,3 +110,53 @@ impl JrpcRequestMetaMaybeBatch {
}
}
}

// tests ---------------------------------------------------------------

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_jrpc_request_meta_maybe_batch_deserialize() {
let json = r#"[
{
"jsonrpc": "2.0",
"id": 1108954,
"method": "net_version"
},
{
"jsonrpc": "2.0",
"id": "1108955",
"method": "eth_getBlockByNumber",
"params": [
"0x73f151",
true
]
}
]"#;

let result: Result<JrpcRequestMetaMaybeBatch, _> = serde_json::from_str(json);
assert!(result.is_ok(), "{result:?}");

let batch = result.unwrap();
match batch {
JrpcRequestMetaMaybeBatch::Batch(requests) => {
assert_eq!(requests.len(), 2);

// First request
assert_eq!(*requests[0].id(), Id::Number(1108954));
assert_eq!(requests[0].method(), Cow::Borrowed("net_version"));
assert!(requests[0].params().is_empty());

// Second request
assert_eq!(*requests[1].id(), Id::Number(1108955));
assert_eq!(requests[1].method(), Cow::Borrowed("eth_getBlockByNumber"));
assert_eq!(requests[1].params().len(), 2);
}
JrpcRequestMetaMaybeBatch::Single(_) => {
panic!("Expected Batch variant, got Single");
}
}
}
}
223 changes: 218 additions & 5 deletions crates/rproxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ use crate::{
utils::tls_certificate_validity_timestamps,
};

const MAX_OPEN_FILES: u64 = 10240;

// Proxy ---------------------------------------------------------------

pub struct Server {}
Expand All @@ -39,6 +37,14 @@ impl Server {
let canceller = Server::wait_for_shutdown_signal();
let resetter = Server::wait_for_reset_signal(canceller.clone());

Self::_run(config, canceller, resetter).await
}

async fn _run(
config: Config,
canceller: CancellationToken,
resetter: broadcast::Sender<()>,
) -> Result<(), Box<dyn std::error::Error + Send>> {
// try to set system limits
match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Ok((_, hard)) => {
Expand Down Expand Up @@ -195,7 +201,11 @@ impl Server {
}));
}

futures::future::join_all(services).await;
for res in futures::future::join_all(services).await.iter() {
if let Err(err) = res {
warn!(error = ?err, "One of the services had failed")
}
}
}

Ok(())
Expand Down Expand Up @@ -237,7 +247,7 @@ impl Server {
}

fn wait_for_reset_signal(canceller: CancellationToken) -> broadcast::Sender<()> {
let (resetter, _) = broadcast::channel::<()>(2);
let (resetter, _) = broadcast::channel::<()>(1);

{
let resetter = resetter.clone();
Expand All @@ -252,7 +262,8 @@ impl Server {
info!("Hangup signal received, resetting...");

if let Err(err) = resetter.send(()) {
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal");
error!(from = "sighup", error = ?err, "Failed to broadcast reset signal, shutting down whole proxy...");
canceller.cancel();
}
}

Expand All @@ -267,3 +278,205 @@ impl Server {
resetter
}
}

// tests ===============================================================

#[cfg(test)]
mod tests {
use std::{net::SocketAddr, time::Duration};

use awc::{Client, http::header};
use clap::Parser;
use jsonrpsee::{
RpcModule,
server::{ServerBuilder, ServerHandle},
};
use tracing::{debug, info};

use super::*;
use crate::config::Config;

async fn spawn_rpc_backend() -> (SocketAddr, ServerHandle) {
let server = ServerBuilder::default().build("127.0.0.1:0").await.unwrap();

let addr: SocketAddr = server.local_addr().unwrap();

let mut module = RpcModule::new(());

module
.register_async_method("eth_chainId", |_params, _ctx, _ext| async move {
Ok::<_, jsonrpsee::types::ErrorObjectOwned>("0x1")
})
.unwrap();

let handle = server.start(module);

(addr, handle)
}

#[actix_web::test]
async fn test_circuit_breaker() {
let (backend, _handle) = spawn_rpc_backend().await;

let cfg = {
let mut cfg = Config::parse_from(["rproxy"]);

cfg.authrpc.enabled = true;
cfg.authrpc.backend_url = format!("http://{backend}");
cfg.authrpc.listen_address = "127.0.0.1:18645".into();
cfg.authrpc.shutdown_timeout_sec = 1;

cfg.rpc.enabled = true;
cfg.rpc.backend_url = format!("http://{backend}");
cfg.rpc.listen_address = "127.0.0.1:18651".into();
cfg.rpc.shutdown_timeout_sec = 1;

cfg.logging.level = "warn,rproxy::server::tests=info".into();
cfg.logging.setup_logging();

cfg
};

let proxy_addr_authrpc = cfg.clone().authrpc.listen_address;
let proxy_addr_rpc = cfg.clone().rpc.listen_address;

let canceller = tokio_util::sync::CancellationToken::new();
let resetter = Server::wait_for_reset_signal(canceller.clone());

let server = {
let canceller = canceller.clone();
let resetter = resetter.clone();

actix_rt::spawn(async move { Server::_run(cfg, canceller, resetter).await })
};
actix_rt::time::sleep(std::time::Duration::from_millis(100)).await;

{
let canceller = canceller.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();
let proxy_addr_authrpc = proxy_addr_authrpc.clone();

actix_rt::spawn(async move {
loop {
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;

let req = client
.post(format!("http://{proxy_addr_authrpc}"))
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
.send_body(
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
);

tokio::select! {
res = req => {
match res {
Ok(mut res) => {
let _ = res.body().await;
}

Err(err) => {
debug!(error = ?err, "Failed to send a request");
}
}
}

_ = canceller.cancelled() => {
break
}
}
}
});
}

{
let canceller = canceller.clone();
let client = Client::builder().timeout(Duration::from_millis(10)).finish();

actix_rt::spawn(async move {
loop {
actix_rt::time::sleep(std::time::Duration::from_millis(10)).await;

let req = client
.post(format!("http://{proxy_addr_rpc}"))
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
.send_body(
r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#,
);

tokio::select! {
res = req => {
match res {
Ok(mut res) => {
let _ = res.body().await;
}

Err(err) => {
debug!(error = ?err, "Failed to send a request");
}
}
}

_ = canceller.cancelled() => {
break
}
}
}
});
}

let client = Client::builder().timeout(Duration::from_millis(10)).finish();

for i in 0..10 {
match resetter.send(()) {
Err(err) => {
debug!(iteration = i, error = ?err, "Failed to send a reset");
}

Ok(proxies_count) => {
info!(iteration = i, proxies_count = proxies_count, "Sent a reset");
assert_eq!(
proxies_count, 2,
"sent reset wrong count of proxies: {proxies_count} != 2"
);
}
}

actix_rt::time::sleep(std::time::Duration::from_millis(1200)).await;

let req = client
.post(format!("http://{proxy_addr_authrpc}"))
.insert_header((header::CONTENT_TYPE, mime::APPLICATION_JSON))
.send_body(r#"{"jsonrpc":"2.0","method":"eth_chainId","params":[],"id":1}"#);

tokio::select! {
res = req => {
match res {
Ok(mut res) => {
match res.body().await {
Err(err) => {
panic!("Failed to send a request: {err}");
}
Ok(body) => {
let body = String::from_utf8_lossy(&body).to_string();
info!("Sent a request and got a response: {body}");
}
}
}

Err(err) => {
panic!("Failed to send a request: {err}");
}
}
}

_ = canceller.cancelled() => {
break
}
}
}

canceller.cancel();

tokio::time::timeout(tokio::time::Duration::from_secs(5), server).await.ok();
}
}
16 changes: 16 additions & 0 deletions crates/rproxy/src/server/proxy/config/authrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ pub(crate) struct ConfigAuthrpc {
name("authrpc_remove_backend_from_mirroring_peers")
)]
pub(crate) remove_backend_from_mirroring_peers: bool,

/// timeout for graceful shutdown of authrpc workers
#[arg(
default_value = "5",
env = "RPROXY_AUTHRPC_SHUTDOWN_TIMEOUT_SEC",
help_heading = "authrpc",
long("authrpc-shutdown-timeout-sec"),
name("authrpc_shutdown_timeout_sec"),
value_name = "seconds"
)]
pub(crate) shutdown_timeout_sec: u64,
}

impl ConfigAuthrpc {
Expand Down Expand Up @@ -449,6 +460,11 @@ impl ConfigProxyHttp for ConfigAuthrpc {
fn prealloacated_response_buffer_size(&self) -> usize {
1024 * self.prealloacated_response_buffer_size_kb
}

#[inline]
fn shutdown_timeout_sec(&self) -> u64 {
self.shutdown_timeout_sec
}
}

// ConfigAuthrpcError --------------------------------------------------
Expand Down
Loading