Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/backend/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl HttpBackend {
response
.json()
.await
.map_err(|e| RoxyError::Internal(format!("failed to parse response: {}", e)))
.map_err(|e| RoxyError::Internal(format!("failed to parse response: {e}")))
}
}

Expand Down
6 changes: 4 additions & 2 deletions crates/cache/src/compressed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct CompressedCache<C> {

impl<C: Cache> CompressedCache<C> {
/// Create a new compressed cache with default compression quality.
#[must_use]
pub fn new(inner: C) -> Self {
Self::with_quality(inner, DEFAULT_QUALITY)
}
Expand All @@ -36,6 +37,7 @@ impl<C: Cache> CompressedCache<C> {
/// # Arguments
/// * `inner` - The inner cache to wrap
/// * `quality` - Brotli compression quality (0-11, where 11 is best compression)
#[must_use]
pub fn with_quality(inner: C, quality: u32) -> Self {
Self { inner, quality: quality.min(11) }
}
Expand All @@ -50,7 +52,7 @@ impl<C: Cache> CompressedCache<C> {
self.quality,
DEFAULT_LG_WINDOW_SIZE,
);
writer.write_all(data).map_err(|e| CacheError(format!("compression failed: {}", e)))?;
writer.write_all(data).map_err(|e| CacheError(format!("compression failed: {e}")))?;
}
Ok(compressed)
}
Expand All @@ -61,7 +63,7 @@ impl<C: Cache> CompressedCache<C> {
let mut decompressor = brotli::Decompressor::new(data, 4096);
decompressor
.read_to_end(&mut decompressed)
.map_err(|e| CacheError(format!("decompression failed: {}", e)))?;
.map_err(|e| CacheError(format!("decompression failed: {e}")))?;
Ok(decompressed)
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/cache/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl std::fmt::Debug for MemoryCache {
impl Cache for MemoryCache {
async fn get(&self, key: &str) -> Result<Option<Bytes>, CacheError> {
let mut cache =
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {}", e)))?;
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {e}")))?;

if let Some(entry) = cache.get(key) {
if entry.expires_at > Instant::now() {
Expand All @@ -54,7 +54,7 @@ impl Cache for MemoryCache {

async fn put(&self, key: &str, value: Bytes, ttl: Duration) -> Result<(), CacheError> {
let mut cache =
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {}", e)))?;
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {e}")))?;

let entry = CacheEntry { value, expires_at: Instant::now() + ttl };

Expand All @@ -64,7 +64,7 @@ impl Cache for MemoryCache {

async fn delete(&self, key: &str) -> Result<(), CacheError> {
let mut cache =
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {}", e)))?;
self.cache.lock().map_err(|e| CacheError(format!("lock poisoned: {e}")))?;

cache.pop(key);
Ok(())
Expand Down
10 changes: 5 additions & 5 deletions crates/cache/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl RedisCache {
/// Returns a `CacheError` if the URL is invalid or connection cannot be established.
pub fn new(url: &str) -> Result<Self, CacheError> {
let client =
Client::open(url).map_err(|e| CacheError(format!("failed to create client: {}", e)))?;
Client::open(url).map_err(|e| CacheError(format!("failed to create client: {e}")))?;
Ok(Self { client })
}

Expand All @@ -35,7 +35,7 @@ impl RedisCache {
self.client
.get_multiplexed_async_connection()
.await
.map_err(|e| CacheError(format!("connection error: {}", e)))
.map_err(|e| CacheError(format!("connection error: {e}")))
}
}

Expand All @@ -44,7 +44,7 @@ impl Cache for RedisCache {
let mut conn = self.get_connection().await?;

let result: Option<Vec<u8>> =
conn.get(key).await.map_err(|e| CacheError(format!("get error: {}", e)))?;
conn.get(key).await.map_err(|e| CacheError(format!("get error: {e}")))?;

Ok(result.map(Bytes::from))
}
Expand All @@ -58,15 +58,15 @@ impl Cache for RedisCache {
// Use SETEX for atomic set with expiration
conn.set_ex::<_, _, ()>(key, value.as_ref(), ttl_secs)
.await
.map_err(|e| CacheError(format!("put error: {}", e)))?;
.map_err(|e| CacheError(format!("put error: {e}")))?;

Ok(())
}

async fn delete(&self, key: &str) -> Result<(), CacheError> {
let mut conn = self.get_connection().await?;

conn.del::<_, ()>(key).await.map_err(|e| CacheError(format!("delete error: {}", e)))?;
conn.del::<_, ()>(key).await.map_err(|e| CacheError(format!("delete error: {e}")))?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/cache/src/rpc_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl<C: Cache> RpcCache<C> {
///
/// This initializes the cache with sensible default policies for common
/// Ethereum JSON-RPC methods.
#[must_use]
pub fn new(inner: C) -> Self {
let mut policies = HashMap::new();

Expand Down
4 changes: 2 additions & 2 deletions crates/cli/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub async fn run_server(app: roxy_server::Router, config: &RoxyConfig) -> Result
let addr = format!("{}:{}", config.server.host, config.server.port);
let listener = tokio::net::TcpListener::bind(&addr)
.await
.wrap_err_with(|| format!("failed to bind to {}", addr))?;
.wrap_err_with(|| format!("failed to bind to {addr}"))?;

info!(address = %addr, "Roxy RPC proxy listening");

Expand All @@ -63,7 +63,7 @@ pub async fn run_server(app: roxy_server::Router, config: &RoxyConfig) -> Result
let metrics_addr = format!("{}:{}", config.metrics.host, config.metrics.port);
let metrics_listener = tokio::net::TcpListener::bind(&metrics_addr)
.await
.wrap_err_with(|| format!("failed to bind metrics server to {}", metrics_addr))?;
.wrap_err_with(|| format!("failed to bind metrics server to {metrics_addr}"))?;

info!(address = %metrics_addr, "Metrics server listening");

Expand Down
169 changes: 169 additions & 0 deletions crates/runtime/src/tokio_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,172 @@ impl Clock for TokioContext {
tokio::time::sleep(duration).await;
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_new_creates_context() {
let ctx = TokioContext::new();
assert!(ctx.label.is_empty());
}

#[test]
fn test_default_creates_context() {
let ctx = TokioContext::default();
assert!(ctx.label.is_empty());
}

#[tokio::test]
async fn test_spawn_returns_result() {
let ctx = TokioContext::new();
let handle = ctx.spawn(|_| async { 42 });
let result = handle.join().await.unwrap();
assert_eq!(result, 42);
}

#[tokio::test]
async fn test_spawn_receives_context() {
let ctx = TokioContext::new().with_label("parent");
let handle = ctx.spawn(|c| async move { c.label.clone() });
let result = handle.join().await.unwrap();
assert_eq!(result, "parent");
}

#[tokio::test]
async fn test_spawn_multiple_tasks() {
let ctx = TokioContext::new();

let h1 = ctx.spawn(|_| async { 1 });
let h2 = ctx.spawn(|_| async { 2 });
let h3 = ctx.spawn(|_| async { 3 });

let r1 = h1.join().await.unwrap();
let r2 = h2.join().await.unwrap();
let r3 = h3.join().await.unwrap();

assert_eq!(r1 + r2 + r3, 6);
}

#[test]
fn test_with_label_single() {
let ctx = TokioContext::new();
let labeled = ctx.with_label("test");
assert_eq!(labeled.label, "test");
}

#[test]
fn test_with_label_nested() {
let ctx = TokioContext::new();
let first = ctx.with_label("parent");
let second = first.with_label("child");
assert_eq!(second.label, "parent:child");
}

#[test]
fn test_with_label_deeply_nested() {
let ctx = TokioContext::new();
let labeled = ctx.with_label("a").with_label("b").with_label("c");
assert_eq!(labeled.label, "a:b:c");
}

#[test]
fn test_with_label_preserves_stop_channel() {
let ctx = TokioContext::new();
let labeled = ctx.with_label("test");
assert!(Arc::ptr_eq(&ctx.stop_tx, &labeled.stop_tx));
}

#[tokio::test]
async fn test_stop_signals_stopped() {
let ctx = TokioContext::new();
let ctx_clone = ctx.clone();

let handle = ctx.spawn(|c| async move {
match c.stopped().await {
Signal::Closed(code) => code,
Signal::Open => -999,
}
});

ctx_clone.stop(42, None).await;

let result = handle.join().await.unwrap();
assert_eq!(result, 42);
}

#[tokio::test]
async fn test_stop_with_different_codes() {
for code in [0, 1, -1, 100, i32::MAX, i32::MIN] {
let ctx = TokioContext::new();
let ctx_clone = ctx.clone();

let handle = ctx.spawn(|c| async move {
match c.stopped().await {
Signal::Closed(c) => c,
Signal::Open => -999,
}
});

ctx_clone.stop(code, None).await;

let result = handle.join().await.unwrap();
assert_eq!(result, code);
}
}

#[tokio::test]
async fn test_stop_propagates_to_labeled_context() {
let ctx = TokioContext::new();
let labeled = ctx.with_label("child");

let handle = labeled.spawn(|c| async move {
match c.stopped().await {
Signal::Closed(code) => code,
Signal::Open => -999,
}
});

ctx.stop(123, None).await;

let result = handle.join().await.unwrap();
assert_eq!(result, 123);
}

#[tokio::test]
async fn test_now_returns_increasing_time() {
let ctx = TokioContext::new();
let t1 = ctx.now();
tokio::time::sleep(Duration::from_millis(10)).await;
let t2 = ctx.now();
assert!(t2 > t1);
}

#[tokio::test]
async fn test_sleep_waits_minimum_duration() {
let ctx = TokioContext::new();
let sleep_duration = Duration::from_millis(50);

let start = Instant::now();
ctx.sleep(sleep_duration).await;
let elapsed = start.elapsed();

assert!(elapsed >= sleep_duration);
}

#[tokio::test]
async fn test_clone_shares_state() {
let ctx1 = TokioContext::new();
let ctx2 = ctx1.clone();
assert!(Arc::ptr_eq(&ctx1.stop_tx, &ctx2.stop_tx));
}

#[tokio::test]
async fn test_debug_impl() {
let ctx = TokioContext::new().with_label("test");
let debug_str = format!("{:?}", ctx);
assert!(debug_str.contains("TokioContext"));
assert!(debug_str.contains("test"));
}
}
4 changes: 2 additions & 2 deletions crates/server/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ async fn handle_message(
Ok(req) => req,
Err(e) => {
let response =
WsResponse::error(serde_json::Value::Null, -32700, format!("Parse error: {}", e));
WsResponse::error(serde_json::Value::Null, -32700, format!("Parse error: {e}"));
return serde_json::to_string(&response).ok();
}
};
Expand Down Expand Up @@ -579,7 +579,7 @@ async fn handle_subscribe(
return WsResponse::error(
request.id.clone(),
-32602,
format!("Unknown subscription type: {}", subscription_type),
format!("Unknown subscription type: {subscription_type}"),
);
}
}
Expand Down
18 changes: 18 additions & 0 deletions crates/traits/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,19 @@ impl CodecConfig for DefaultCodecConfig {
/// Trait for decoding bytes into a type with configuration.
pub trait Decode: Sized {
/// Decode from bytes using the given configuration.
///
/// # Errors
///
/// Returns [`CodecError`] if decoding fails due to invalid data,
/// size limits exceeded, or other configuration violations.
fn decode<C: CodecConfig>(bytes: &[u8], config: &C) -> Result<Self, CodecError>;

/// Decode from Bytes using the given configuration.
///
/// # Errors
///
/// Returns [`CodecError`] if decoding fails due to invalid data,
/// size limits exceeded, or other configuration violations.
fn decode_bytes<C: CodecConfig>(bytes: &Bytes, config: &C) -> Result<Self, CodecError> {
Self::decode(bytes.as_ref(), config)
}
Expand All @@ -121,9 +131,17 @@ pub trait Decode: Sized {
/// Trait for encoding a type into bytes.
pub trait Encode {
/// Encode into bytes.
///
/// # Errors
///
/// Returns [`CodecError`] if encoding fails.
fn encode(&self) -> Result<Bytes, CodecError>;

/// Encode into a `Vec<u8>`.
///
/// # Errors
///
/// Returns [`CodecError`] if encoding fails.
fn encode_vec(&self) -> Result<Vec<u8>, CodecError> {
self.encode().map(|b| b.to_vec())
}
Expand Down
4 changes: 4 additions & 0 deletions crates/traits/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl<T> Handle<T> {
}

/// Wait for the task to complete.
///
/// # Errors
///
/// Returns [`JoinError`] if the task was cancelled or panicked.
pub async fn join(self) -> Result<T, JoinError> {
self.inner.await.map_err(JoinError)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub enum RoxyError {
}

impl RoxyError {
/// Convert to an alloy ErrorPayload for JSON-RPC responses.
/// Convert to an alloy [`ErrorPayload`] for JSON-RPC responses.
#[must_use]
pub fn to_error_payload(&self) -> ErrorPayload {
match self {
Expand Down
Loading