-
|
Use-Case: Large buffers (~ 1MB) and large number of them (~millions) are to be transferred by the client and exploring hyper for it. Need a way to limit the network bandwidth used. I see a similar ticket 2163 which was closed and hence opening a new one. Is there a way to control n/w bandwidth in hyper? |
Beta Was this translation helpful? Give feedback.
Replies: 8 comments
-
|
As @seanmonstar said (#2163 (comment)), hyper uses streamed bodies so you should be able to control the rate of reads and writes. |
Beta Was this translation helpful? Give feedback.
-
|
How on earth did I miss that comment? It seems I read it but did not process it. |
Beta Was this translation helpful? Give feedback.
-
I explored it and I am still confused. My understanding is that hyper Client has a pool of tcp connections. So at a client level (i.e. This implies that consumer of the client will specify say "do not use more than 20 MB/s". This needs to be split across underlying connections, which is further complicated by the fact that number of connections is dynamic. |
Beta Was this translation helpful? Give feedback.
-
|
How about this? I have used the limiter in the crate: async-speed-limit Now we:
The The Reference gist: here Usage use hyper::{self, Body, body::HttpBody};
let builder = hyper::client::Client::builder();
let connector = MyConnector::with_limit(102400.0); // ~ 100KB/s Limit
let client = Arc::new(builder.build::<MyConnector, Body>(connector));Does it work?Initial results does seem to suggest that it is working |
Beta Was this translation helpful? Give feedback.
-
|
Hey, although I haven't tried it yet, your solution looks good to me. Since all connections use the same limiter, it would be able to limit the overall bandwidth rate perfectly. |
Beta Was this translation helpful? Give feedback.
-
|
Bandwidth rate limiting in hyper is a common need for agent-serving HTTP servers — the challenge is that naive byte-rate limits don't map well to how LLM streaming responses work. For LLM inference servers built on hyper, the interesting rate limiting scenarios are: Token-rate vs byte-rate — for streaming LLM responses, you want to rate limit by token throughput, not bytes. A response token might be 1 byte (single character) or 4 bytes (multi-byte Unicode). Byte-rate limits create confusing behavior where the same number of tokens gets different bandwidth depending on content. Per-agent quotas — in multi-tenant serving, you want to rate limit per agent identity (from the Authorization header or session token), not per TCP connection. A single agent making many connections should share one quota. Burst vs sustained — LLM generation has a bursty profile: initial TTFT can be long, then tokens stream quickly. A token bucket model (allow burst of N tokens, then sustained rate) works better than strict rate limiting. Budget-aware throttling — rather than pure rate limiting (tokens/sec), consider budget-aware throttling: if an agent has exhausted 80% of its budget, throttle its throughput to make the remaining budget last longer (give the orchestrator time to notice and intervene). Tower's We built a budget-aware rate limiting layer for KinthAI's agent serving: https://blog.kinthai.ai/agent-wallet-economic-models-autonomous-agents covers the economic model that drives the throttling decisions. Are you rate limiting from the client side (throttling outbound requests) or the server side (throttling incoming connections)? |
Beta Was this translation helpful? Give feedback.
-
|
Bandwidth rate limiting in hyper (and tower-based middleware generally) works well as a middleware layer. Here's a clean implementation: use hyper::{Body, Request, Response};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::io::AsyncReadExt;
// Token bucket rate limiter
struct RateLimiter {
bytes_per_second: u64,
}
impl RateLimiter {
async fn throttle_read(&self, bytes: u64) {
let delay_ms = (bytes * 1000) / self.bytes_per_second;
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
}For download bandwidth limiting specifically: Per-connection vs global limits:
The global limit is trickier because you need to share state across connections. A Tower middleware approach: For agent API servers: What's the scale you're trying to rate limit at — per-connection, per-IP, or per-user? |
Beta Was this translation helpful? Give feedback.
-
|
Bandwidth rate limiting at the hyper layer is achievable with a custom body type that wraps the response stream: use hyper::body::{Body, Frame};
use pin_project::pin_project;
use tokio::time::{sleep, Duration, Instant};
#[pin_project]
pub struct RateLimitedBody<B> {
#[pin]
inner: B,
bytes_per_sec: u64,
bytes_sent: u64,
window_start: Instant,
}
impl<B: Body<Data = bytes::Bytes>> Body for RateLimitedBody<B> {
type Data = bytes::Bytes;
type Error = B::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let this = self.project();
// Check if we've exceeded the rate limit
let elapsed = this.window_start.elapsed().as_secs_f64();
let allowed = (*this.bytes_per_sec as f64 * elapsed) as u64;
if *this.bytes_sent >= allowed {
// Schedule wake-up when budget refills
let wait = Duration::from_secs_f64(
(*this.bytes_sent - allowed) as f64 / *this.bytes_per_sec as f64
);
let waker = cx.waker().clone();
tokio::spawn(async move {
sleep(wait).await;
waker.wake();
});
return Poll::Pending;
}
match this.inner.poll_frame(cx) {
Poll::Ready(Some(Ok(frame))) => {
if let Some(data) = frame.data_ref() {
*this.bytes_sent += data.len() as u64;
}
Poll::Ready(Some(Ok(frame)))
}
other => other,
}
}
}Then wrap your response body: let response = Response::builder()
.body(RateLimitedBody {
inner: original_body,
bytes_per_sec: 1024 * 1024, // 1 MB/s
bytes_sent: 0,
window_start: Instant::now(),
})?;The token-bucket algorithm is more precise than the sliding window above — |
Beta Was this translation helpful? Give feedback.
How about this?
I have used the limiter in the crate: async-speed-limit
Now we:
MyStreamwhich wrapstokio::net::TcpStreamand implementsAsyncWrite&AsyncReadMyConnectorsimilar toHttpConnectorwhich helps to create newMyStreamMyConnectorMyConnectorto attach clone ofLimiterto every newMyStream(done viafn call()inMyConnector)The
Limiteris thread-safe & cheaply clone-able. They all share the same bucket wrapped in Arc.The
MyStream'sfn poll_writecalls the limiter to "consume" and wait if not enough tokens.Reference gist: here
Usage