From 62edd49dd1eb0ab54b43d315c6c6ebb17ef15b99 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 11 Jan 2021 18:05:38 +0100 Subject: [PATCH 01/19] Add a temporary IP filter --- src/async_impl/client.rs | 34 ++++++++++++++++++++++++++++++++-- src/connect.rs | 4 ++-- src/dns.rs | 20 +++++++++++++++++--- src/error.rs | 2 ++ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index b6adc6485..1f163b389 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -104,6 +104,8 @@ struct Config { #[cfg(feature = "cookies")] cookie_store: Option, trust_dns: bool, + #[cfg(feature = "trust-dns")] + ip_filter: fn(std::net::IpAddr) -> bool, error: Option, https_only: bool, } @@ -156,6 +158,8 @@ impl ClientBuilder { local_address: None, nodelay: true, trust_dns: cfg!(feature = "trust-dns"), + #[cfg(feature = "trust-dns")] + ip_filter: |_| true, #[cfg(feature = "cookies")] cookie_store: None, https_only: false, @@ -191,7 +195,7 @@ impl ClientBuilder { let http = match config.trust_dns { false => HttpConnector::new_gai(), #[cfg(feature = "trust-dns")] - true => HttpConnector::new_trust_dns()?, + true => HttpConnector::new_trust_dns(config.ip_filter)?, #[cfg(not(feature = "trust-dns"))] true => unreachable!("trust-dns shouldn't be enabled unless the feature is"), }; @@ -348,6 +352,7 @@ impl ClientBuilder { proxies, proxies_maybe_http_auth, https_only: config.https_only, + ip_filter: config.ip_filter, }), }) } @@ -883,6 +888,17 @@ impl ClientBuilder { } } + /// Adds a filter for valid IP addresses during DNS lookup. + /// + /// # Optional + /// + /// This requires the optional `trust-dns` feature to be enabled. + #[cfg(feature = "trust-dns")] + pub fn ip_filter(mut self, filter: fn(std::net::IpAddr) -> bool) -> ClientBuilder { + self.config.ip_filter = filter; + self + } + /// Restrict the Client to be used with HTTPS only requests. /// /// Defaults to false. @@ -984,7 +1000,20 @@ impl Client { /// /// This method fails whenever supplied `Url` cannot be parsed. pub fn request(&self, method: Method, url: U) -> RequestBuilder { - let req = url.into_url().map(move |url| Request::new(method, url)); + let req = url.into_url().and_then(move |url| { + let is_valid_ip = match url.host() { + Some(url::Host::Ipv4(ip)) => (self.inner.ip_filter)(IpAddr::V4(ip)), + Some(url::Host::Ipv6(ip)) => (self.inner.ip_filter)(IpAddr::V6(ip)), + _ => true, + }; + + if !is_valid_ip { + let e = trust_dns_resolver::error::ResolveError::from("destination is restricted"); + return Err(crate::Error::new(crate::error::Kind::Request, Some(e))); + } + + Ok(Request::new(method, url)) + }); RequestBuilder::new(self.clone(), req) } @@ -1218,6 +1247,7 @@ struct ClientRef { proxies: Arc>, proxies_maybe_http_auth: bool, https_only: bool, + ip_filter: fn(IpAddr) -> bool, } impl ClientRef { diff --git a/src/connect.rs b/src/connect.rs index c58ae8407..f7d75a4d5 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -41,8 +41,8 @@ impl HttpConnector { } #[cfg(feature = "trust-dns")] - pub(crate) fn new_trust_dns() -> crate::Result { - TrustDnsResolver::new() + pub(crate) fn new_trust_dns(filter: fn(std::net::IpAddr) -> bool) -> crate::Result { + TrustDnsResolver::new(filter) .map(hyper::client::HttpConnector::new_with_resolver) .map(Self::TrustDns) .map_err(crate::error::builder) diff --git a/src/dns.rs b/src/dns.rs index adba67e65..ec49429bc 100644 --- a/src/dns.rs +++ b/src/dns.rs @@ -25,10 +25,12 @@ lazy_static! { #[derive(Clone)] pub(crate) struct TrustDnsResolver { state: Arc>, + filter: fn(std::net::IpAddr) -> bool, } pub(crate) struct SocketAddrs { iter: LookupIpIntoIter, + filter: fn(std::net::IpAddr) -> bool, } enum State { @@ -37,7 +39,7 @@ enum State { } impl TrustDnsResolver { - pub(crate) fn new() -> io::Result { + pub(crate) fn new(filter: fn(std::net::IpAddr) -> bool) -> io::Result { SYSTEM_CONF.as_ref().map_err(|e| { io::Error::new(e.kind(), format!("error reading DNS system conf: {}", e)) })?; @@ -47,6 +49,7 @@ impl TrustDnsResolver { // resolver. Ok(TrustDnsResolver { state: Arc::new(Mutex::new(State::Init)), + filter, }) } } @@ -63,6 +66,7 @@ impl Service for TrustDnsResolver { fn call(&mut self, name: hyper_dns::Name) -> Self::Future { let resolver = self.clone(); Box::pin(async move { + let filter = resolver.filter; let mut lock = resolver.state.lock().await; let resolver = match &*lock { @@ -79,7 +83,12 @@ impl Service for TrustDnsResolver { drop(lock); let lookup = resolver.lookup_ip(name.as_str()).await?; - Ok(SocketAddrs { iter: lookup.into_iter() }) + if !lookup.iter().any(filter) { + let e = trust_dns_resolver::error::ResolveError::from("destination is restricted"); + return Err(e.into()); + } + + Ok(SocketAddrs { iter: lookup.into_iter(), filter }) }) } } @@ -88,7 +97,12 @@ impl Iterator for SocketAddrs { type Item = SocketAddr; fn next(&mut self) -> Option { - self.iter.next().map(|ip_addr| SocketAddr::new(ip_addr, 0)) + loop { + let ip_addr = self.iter.next()?; + if (self.filter)(ip_addr) { + return Some(SocketAddr::new(ip_addr, 0)); + } + } } } diff --git a/src/error.rs b/src/error.rs index cb05eb774..2e4d6a9bd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -112,6 +112,8 @@ impl Error { if hyper_err.is_connect() { return true; } + } else if err.downcast_ref::().is_some() { + return true; } source = err.source(); From a780c826d293b500eb2857b21d5eb49d3c0297b6 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Mon, 27 Nov 2023 12:21:04 +0100 Subject: [PATCH 02/19] properly restrict redirect URLs --- src/async_impl/client.rs | 41 ++++++++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index b673cc84c..260f40bf4 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -1753,20 +1753,7 @@ impl Client { /// /// This method fails whenever the supplied `Url` cannot be parsed. pub fn request(&self, method: Method, url: U) -> RequestBuilder { - let req = url.into_url().and_then(move |url| { - let is_valid_ip = match url.host() { - Some(url::Host::Ipv4(ip)) => (self.inner.ip_filter)(IpAddr::V4(ip)), - Some(url::Host::Ipv6(ip)) => (self.inner.ip_filter)(IpAddr::V6(ip)), - _ => true, - }; - - if !is_valid_ip { - let e = trust_dns_resolver::error::ResolveError::from("destination is restricted"); - return Err(crate::Error::new(crate::error::Kind::Request, Some(e))); - } - - Ok(Request::new(method, url)) - }); + let req = url.into_url().map(move |url| Request::new(method, url)); RequestBuilder::new(self.clone(), req) } @@ -1826,6 +1813,11 @@ impl Client { } } + if let Err(err) = validate_url(self.inner.ip_filter, &url) { + return Pending { + inner: PendingInner::Error(Some(err)), + }; + } let uri = expect_uri(&url); let (reusable, body) = match body { @@ -2195,6 +2187,8 @@ impl PendingRequest { } self.retry_count += 1; + // XXX: We can't return an `Err` here, as we are mutating the `in_flight` future to restart it. + // However, at this point, we already validated `self.url` so it should be good. let uri = expect_uri(&self.url); *self.as_mut().in_flight().get_mut() = match *self.as_mut().in_flight().as_ref() { @@ -2409,6 +2403,11 @@ impl Future for PendingRequest { std::mem::replace(self.as_mut().headers(), HeaderMap::new()); remove_sensitive_headers(&mut headers, &self.url, &self.urls); + + if let Err(err) = validate_url(self.client.ip_filter, &self.url) { + return Poll::Ready(Err(err)); + } + let uri = expect_uri(&self.url); let body = match self.body { Some(Some(ref body)) => Body::reusable(body.clone()), @@ -2506,6 +2505,20 @@ fn add_cookie_header(headers: &mut HeaderMap, cookie_store: &dyn cookie::CookieS } } +fn validate_url(ip_filter: fn(IpAddr) -> bool, url: &Url) -> Result<(), crate::Error> { + let is_valid_ip = match url.host() { + Some(url::Host::Ipv4(ip)) => (ip_filter)(IpAddr::V4(ip)), + Some(url::Host::Ipv6(ip)) => (ip_filter)(IpAddr::V6(ip)), + _ => true, + }; + + if !is_valid_ip { + let e = trust_dns_resolver::error::ResolveError::from("destination is restricted"); + return Err(crate::Error::new(crate::error::Kind::Request, Some(e))); + } + Ok(()) +} + #[cfg(test)] mod tests { #[tokio::test] From 4a6d728c4b94732f75ef20bbfb3ee50569244bbb Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 15 Dec 2023 22:21:03 +0100 Subject: [PATCH 03/19] fix: Restrict redirects to http and https --- src/async_impl/client.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 260f40bf4..3217dc658 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -2391,6 +2391,10 @@ impl Future for PendingRequest { redirect::ActionKind::Follow => { debug!("redirecting '{}' to '{}'", self.url, loc); + if loc.scheme() != "http" && loc.scheme() != "https" { + return Poll::Ready(Err(error::url_bad_scheme(loc))); + } + if self.client.https_only && loc.scheme() != "https" { return Poll::Ready(Err(error::redirect( error::url_bad_scheme(loc.clone()), From 81d75a67da7054835a2631b008e420bdc11ec363 Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 11:16:20 -0500 Subject: [PATCH 04/19] telemetry: log dns timing 1% of time --- Cargo.toml | 4 ++-- src/dns/hickory.rs | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a21cd0fdf..04028d790 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,7 +74,7 @@ macos-system-configuration = ["dep:system-configuration"] # Experimental HTTP/3 client. # Disabled while waiting for quinn to upgrade. -#http3 = ["rustls-tls-manual-roots", "dep:h3", "dep:h3-quinn", "dep:quinn", "dep:futures-channel"] +# http3 = ["rustls-tls-manual-roots", "dep:h3", "dep:h3-quinn", "dep:quinn", "dep:futures-channel"] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at anytime. @@ -100,7 +100,7 @@ tower-service = "0.3" futures-core = { version = "0.3.0", default-features = false } futures-util = { version = "0.3.0", default-features = false } sync_wrapper = "0.1.2" - +rand = "0.8" # Optional deps... ## json diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 2cb2afa26..930d75a9c 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -40,7 +40,12 @@ impl Resolve for HickoryDnsResolver { let filter = resolver.filter; let resolver = resolver.state.get_or_try_init(new_resolver)?; + let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; + if rand::random::() < 0.01 { + log::info!("DNS lookup for {} took {:?}", name.as_str(), start.elapsed()); + } + if !lookup.iter().any(filter) { let e = hickory_resolver::error::ResolveError::from("destination is restricted"); return Err(e.into()); From 6b258349ce238785fe081a843c099c18670a84e2 Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 11:24:55 -0500 Subject: [PATCH 05/19] cargo fmt --- src/error.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 4d6488541..56df1ad47 100644 --- a/src/error.rs +++ b/src/error.rs @@ -131,7 +131,10 @@ impl Error { if hyper_err.is_connect() { return true; } - } else if err.downcast_ref::().is_some() { + } else if err + .downcast_ref::() + .is_some() + { return true; } From ba7a942d66d9b581b19d39086ec245cb053f239f Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 11:31:16 -0500 Subject: [PATCH 06/19] more formatting --- src/dns/hickory.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 930d75a9c..33d05e415 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -43,7 +43,11 @@ impl Resolve for HickoryDnsResolver { let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; if rand::random::() < 0.01 { - log::info!("DNS lookup for {} took {:?}", name.as_str(), start.elapsed()); + log::info!( + "DNS lookup for {} took {:?}", + name.as_str(), + start.elapsed() + ); } if !lookup.iter().any(filter) { From 7408a3124f56130a1e89935841ae4314440ef309 Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 12:31:02 -0500 Subject: [PATCH 07/19] change to info log --- src/dns/hickory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 33d05e415..04864eba5 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -43,7 +43,7 @@ impl Resolve for HickoryDnsResolver { let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; if rand::random::() < 0.01 { - log::info!( + log::warn!( "DNS lookup for {} took {:?}", name.as_str(), start.elapsed() From df9d761891d3f1abdffe8d01221a047760b21fcc Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 14:21:14 -0500 Subject: [PATCH 08/19] try upping dns cache size --- src/dns/hickory.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 04864eba5..6a1ded995 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -86,5 +86,9 @@ fn new_resolver() -> io::Result { format!("error reading DNS system conf: {e}"), ) })?; - Ok(TokioAsyncResolver::tokio(config, opts)) + + let mut mut_ops = opts.clone(); + mut_ops.cache_size = 100_000; // 100k entries + + Ok(TokioAsyncResolver::tokio(config, mut_ops)) } From 012bc30a15cead582fc770a1a2ff76bc8c800ae5 Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Wed, 19 Feb 2025 15:27:05 -0500 Subject: [PATCH 09/19] 500k dns entries cache --- src/dns/hickory.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 6a1ded995..69517908a 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -88,7 +88,7 @@ fn new_resolver() -> io::Result { })?; let mut mut_ops = opts.clone(); - mut_ops.cache_size = 100_000; // 100k entries + mut_ops.cache_size = 500_000; // 500k entries Ok(TokioAsyncResolver::tokio(config, mut_ops)) } From 2b33e5b9f52dbc005212c3e6036d27fbe1993978 Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Thu, 6 Mar 2025 13:40:10 -0800 Subject: [PATCH 10/19] tidy up build and warnings. default to hickory-dns --- Cargo.toml | 4 ++-- src/async_impl/client.rs | 2 ++ src/lib.rs | 8 -------- 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 04028d790..6984d1967 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,7 @@ features = [ ] [features] -default = ["default-tls", "charset", "http2", "macos-system-configuration"] +default = ["default-tls", "charset", "http2", "macos-system-configuration", "hickory-dns"] # Note: this doesn't enable the 'native-tls' feature, which adds specific # functionality for it. @@ -74,7 +74,7 @@ macos-system-configuration = ["dep:system-configuration"] # Experimental HTTP/3 client. # Disabled while waiting for quinn to upgrade. -# http3 = ["rustls-tls-manual-roots", "dep:h3", "dep:h3-quinn", "dep:quinn", "dep:futures-channel"] + http3 = ["rustls-tls-manual-roots", "dep:h3", "dep:h3-quinn", "dep:quinn", "dep:futures-channel"] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at anytime. diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 4c130f003..1701fc522 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -35,6 +35,8 @@ use crate::connect::Connector; use crate::cookie; #[cfg(feature = "hickory-dns")] use crate::dns::hickory::HickoryDnsResolver; +use hickory_resolver; + use crate::dns::{gai::GaiResolver, DnsResolverWithOverrides, DynResolver, Resolve}; use crate::error; use crate::into_url::try_uri; diff --git a/src/lib.rs b/src/lib.rs index ce4549dd9..1a69ed1eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -238,14 +238,6 @@ //! [cargo-features]: https://doc.rust-lang.org/stable/cargo/reference/manifest.html#the-features-section //! [sponsor]: https://seanmonstar.com/sponsor -#[cfg(all(feature = "http3", not(reqwest_unstable)))] -compile_error!( - "\ - The `http3` feature is unstable, and requires the \ - `RUSTFLAGS='--cfg reqwest_unstable'` environment variable to be set.\ -" -); - macro_rules! if_wasm { ($($item:item)*) => {$( #[cfg(target_arch = "wasm32")] From e3a440d25a7849d79605f4613bad39ce288644ca Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Fri, 7 Mar 2025 11:28:26 -0800 Subject: [PATCH 11/19] feat(dns): Allow dns servers to be specified when using hickory-dns This allows us to pass a list of dns servers to be used instead of the defaults for the system. --- src/async_impl/client.rs | 56 ++++++++++++++++++++++++++++++++++++++-- src/dns/hickory.rs | 49 +++++++++++++++++++++-------------- 2 files changed, 83 insertions(+), 22 deletions(-) diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 1701fc522..65b28226d 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -55,6 +55,7 @@ use quinn::TransportConfig; use quinn::VarInt; type HyperResponseFuture = hyper_util::client::legacy::ResponseFuture; +const DEFAULT_DNS_PORT: u16 = 53; /// An asynchronous `Client` to make Requests with. /// @@ -155,7 +156,9 @@ struct Config { cookie_store: Option>, hickory_dns: bool, #[cfg(feature = "hickory-dns")] - ip_filter: fn(std::net::IpAddr) -> bool, + dns_nameservers: Option>, + #[cfg(feature = "hickory-dns")] + ip_filter: fn(IpAddr) -> bool, error: Option, https_only: bool, #[cfg(feature = "http3")] @@ -269,6 +272,8 @@ impl ClientBuilder { #[cfg(feature = "http3")] quic_send_window: None, dns_resolver: None, + #[cfg(feature = "hickory-dns")] + dns_nameservers: None, }, } } @@ -305,7 +310,35 @@ impl ClientBuilder { let mut resolver: Arc = match config.hickory_dns { false => Arc::new(GaiResolver::new()), #[cfg(feature = "hickory-dns")] - true => Arc::new(HickoryDnsResolver::new(config.ip_filter)), + true => { + let mut resolver = HickoryDnsResolver::new(config.ip_filter); + if let Some(nameservers) = config.dns_nameservers { + let mut hickory_config = hickory_resolver::config::ResolverConfig::new(); + for ip in nameservers { + hickory_config.add_name_server(hickory_resolver::config::NameServerConfig { + socket_addr: (ip, DEFAULT_DNS_PORT).into(), + protocol: hickory_resolver::config::Protocol::Udp, + tls_dns_name: None, + trust_negative_responses: false, + bind_addr: None, + }); + hickory_config.add_name_server(hickory_resolver::config::NameServerConfig { + socket_addr: (ip, DEFAULT_DNS_PORT).into(), + protocol: hickory_resolver::config::Protocol::Tcp, + tls_dns_name: None, + trust_negative_responses: false, + bind_addr: None, + }); + } + + let mut opts = hickory_resolver::config::ResolverOpts::default(); + opts.use_hosts_file = false; + + resolver = resolver.with_config(hickory_config, opts); + } + + Arc::new(resolver) + }, #[cfg(not(feature = "hickory-dns"))] true => unreachable!("hickory-dns shouldn't be enabled unless the feature is"), }; @@ -1693,6 +1726,25 @@ impl ClientBuilder { self } + /// Configure custom DNS nameservers for this client when using hickory_dns + /// + /// # Example + /// ``` + /// # use std::net::{IpAddr, Ipv4Addr}; + /// let client = reqwest::Client::builder() + /// .dns_nameservers(vec![IpAddr::V4(Ipv4Addr::new(8, 8, 8, 8))]) + /// .build()?; + /// ``` + #[cfg(feature = "hickory-dns")] + pub fn dns_nameservers(mut self, nameservers: I) -> ClientBuilder + where + I: IntoIterator + { + self.config.dns_nameservers = Some(nameservers.into_iter().collect()); + self + } + + /// Disables the hickory-dns async resolver. /// /// This method exists even if the optional `hickory-dns` feature is not enabled. diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 69517908a..e75638e70 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -1,6 +1,9 @@ -//! DNS resolution via the [hickory-resolver](https://github.com/hickory-dns/hickory-dns) crate - -use hickory_resolver::{lookup_ip::LookupIpIntoIter, system_conf, TokioAsyncResolver}; +use hickory_resolver::{ + config::{ResolverConfig, ResolverOpts}, + lookup_ip::LookupIpIntoIter, + TokioAsyncResolver, + system_conf, +}; use once_cell::sync::OnceCell; use std::io; @@ -17,6 +20,7 @@ pub(crate) struct HickoryDnsResolver { /// construction of the resolver. state: Arc>, filter: fn(std::net::IpAddr) -> bool, + config: Option<(ResolverConfig, ResolverOpts)>, } struct SocketAddrs { @@ -29,8 +33,29 @@ impl HickoryDnsResolver { Self { state: Default::default(), filter, + config: None, } } + + pub fn with_config(mut self, config: ResolverConfig, opts: ResolverOpts) -> Self { + self.config = Some((config, opts)); + self + } + + fn new_resolver(&self) -> io::Result { + let (config, mut opts) = match self.config.clone() { + Some((config, opts)) => (config, opts), + None => system_conf::read_system_conf().map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("error reading DNS system conf: {e}"), + ) + })?, + }; + + opts.cache_size = 500_000; // 500k entries + Ok(TokioAsyncResolver::tokio(config, opts)) + } } impl Resolve for HickoryDnsResolver { @@ -38,7 +63,7 @@ impl Resolve for HickoryDnsResolver { let resolver = self.clone(); Box::pin(async move { let filter = resolver.filter; - let resolver = resolver.state.get_or_try_init(new_resolver)?; + let resolver = resolver.state.get_or_try_init(|| resolver.new_resolver())?; let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; @@ -76,19 +101,3 @@ impl Iterator for SocketAddrs { } } } - -/// Create a new resolver with the default configuration, -/// which reads from `/etc/resolve.conf`. -fn new_resolver() -> io::Result { - let (config, opts) = system_conf::read_system_conf().map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("error reading DNS system conf: {e}"), - ) - })?; - - let mut mut_ops = opts.clone(); - mut_ops.cache_size = 500_000; // 500k entries - - Ok(TokioAsyncResolver::tokio(config, mut_ops)) -} From be34e1e20c88190dfd46dd609f43aeb15e9ede1b Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Fri, 7 Mar 2025 11:41:14 -0800 Subject: [PATCH 12/19] tidy up --- src/dns/hickory.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index e75638e70..929a9816c 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -41,21 +41,6 @@ impl HickoryDnsResolver { self.config = Some((config, opts)); self } - - fn new_resolver(&self) -> io::Result { - let (config, mut opts) = match self.config.clone() { - Some((config, opts)) => (config, opts), - None => system_conf::read_system_conf().map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("error reading DNS system conf: {e}"), - ) - })?, - }; - - opts.cache_size = 500_000; // 500k entries - Ok(TokioAsyncResolver::tokio(config, opts)) - } } impl Resolve for HickoryDnsResolver { @@ -63,7 +48,7 @@ impl Resolve for HickoryDnsResolver { let resolver = self.clone(); Box::pin(async move { let filter = resolver.filter; - let resolver = resolver.state.get_or_try_init(|| resolver.new_resolver())?; + let resolver = resolver.state.get_or_try_init(|| new_resolver(resolver.config))?; let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; @@ -101,3 +86,18 @@ impl Iterator for SocketAddrs { } } } + +fn new_resolver(resolver_config: Option<(ResolverConfig, ResolverOpts)>) -> io::Result { + let (config, mut opts) = match resolver_config { + Some((config, opts)) => (config, opts), + None => system_conf::read_system_conf().map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("error reading DNS system conf: {e}"), + ) + })?, + }; + + opts.cache_size = 500_000; // 500k entries + Ok(TokioAsyncResolver::tokio(config, opts)) +} From 736b8ca01602cf00f4bef363aa1e98c5e56fb3ca Mon Sep 17 00:00:00 2001 From: klochek Date: Mon, 16 Jun 2025 09:29:09 -0400 Subject: [PATCH 13/19] feat(uptime): expose stats for uptime metrics (#3) * feat(uptime): expose stats for uptime metrics * fix(uptime): various fixes; collect all redirects --- Cargo.toml | 10 +++--- src/async_impl/client.rs | 62 +++++++++++++++++++++++++---------- src/async_impl/response.rs | 11 ++++++- src/async_impl/upgrade.rs | 2 +- src/connect.rs | 48 +++++++++++++++++++-------- tests/support/delay_server.rs | 3 +- tests/support/server.rs | 2 +- tests/upgrade.rs | 3 +- 8 files changed, 98 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6984d1967..8d87dc5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,8 +112,8 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { version = "1", features = ["http1", "client"] } -hyper-util = { version = "0.1.3", features = ["http1", "client", "client-legacy", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev="f63c131545499b0c32eb1315224ce2823d8d5fda", features = ["http1", "client"] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "577f70dceb65e8e9dff3dc8c19e79ce877a15cbc", features = ["http1", "client", "client-legacy", "tokio"] } h2 = { version = "0.4", optional = true } once_cell = "1" log = "0.4" @@ -127,7 +127,7 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { version = "0.6", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "ece41a68e5605138d1b77f99284e104efa698892", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } @@ -162,8 +162,8 @@ futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { version = "1.1.0", default-features = false, features = ["http1", "http2", "client", "server"] } -hyper-util = { version = "0.1", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev="f63c131545499b0c32eb1315224ce2823d8d5fda", default-features = false, features = ["http1", "http2", "client", "server"] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "577f70dceb65e8e9dff3dc8c19e79ce877a15cbc", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } serde = { version = "1.0", features = ["derive"] } libflate = "1.0" brotli_crate = { package = "brotli", version = "3.3.0" } diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 65b28226d..ec073e4ce 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -13,6 +13,7 @@ use http::header::{ }; use http::uri::Scheme; use http::Uri; +use hyper::{RedirectStats, RequestStats}; use hyper_util::client::legacy::connect::HttpConnector; #[cfg(feature = "default-tls")] use native_tls_crate::TlsConnector; @@ -315,20 +316,24 @@ impl ClientBuilder { if let Some(nameservers) = config.dns_nameservers { let mut hickory_config = hickory_resolver::config::ResolverConfig::new(); for ip in nameservers { - hickory_config.add_name_server(hickory_resolver::config::NameServerConfig { - socket_addr: (ip, DEFAULT_DNS_PORT).into(), - protocol: hickory_resolver::config::Protocol::Udp, - tls_dns_name: None, - trust_negative_responses: false, - bind_addr: None, - }); - hickory_config.add_name_server(hickory_resolver::config::NameServerConfig { - socket_addr: (ip, DEFAULT_DNS_PORT).into(), - protocol: hickory_resolver::config::Protocol::Tcp, - tls_dns_name: None, - trust_negative_responses: false, - bind_addr: None, - }); + hickory_config.add_name_server( + hickory_resolver::config::NameServerConfig { + socket_addr: (ip, DEFAULT_DNS_PORT).into(), + protocol: hickory_resolver::config::Protocol::Udp, + tls_dns_name: None, + trust_negative_responses: false, + bind_addr: None, + }, + ); + hickory_config.add_name_server( + hickory_resolver::config::NameServerConfig { + socket_addr: (ip, DEFAULT_DNS_PORT).into(), + protocol: hickory_resolver::config::Protocol::Tcp, + tls_dns_name: None, + trust_negative_responses: false, + bind_addr: None, + }, + ); } let mut opts = hickory_resolver::config::ResolverOpts::default(); @@ -338,7 +343,7 @@ impl ClientBuilder { } Arc::new(resolver) - }, + } #[cfg(not(feature = "hickory-dns"))] true => unreachable!("hickory-dns shouldn't be enabled unless the feature is"), }; @@ -1738,13 +1743,12 @@ impl ClientBuilder { #[cfg(feature = "hickory-dns")] pub fn dns_nameservers(mut self, nameservers: I) -> ClientBuilder where - I: IntoIterator + I: IntoIterator, { self.config.dns_nameservers = Some(nameservers.into_iter().collect()); self } - /// Disables the hickory-dns async resolver. /// /// This method exists even if the optional `hickory-dns` feature is not enabled. @@ -2101,6 +2105,9 @@ impl Client { in_flight, timeout, + + poll_start: None, + redirects: vec![], }), } } @@ -2375,6 +2382,9 @@ pin_project! { in_flight: ResponseFuture, #[pin] timeout: Option>>, + + poll_start: Option, + redirects: Vec, } } @@ -2535,8 +2545,12 @@ impl Future for PendingRequest { } } + if self.poll_start.is_none() { + self.poll_start = Some(std::time::Instant::now()); + } + loop { - let res = match self.as_mut().in_flight().get_mut() { + let (stats, res) = match self.as_mut().in_flight().get_mut() { ResponseFuture::Default(r) => match Pin::new(r).poll(cx) { Poll::Ready(Err(e)) => { #[cfg(feature = "http2")] @@ -2704,6 +2718,12 @@ impl Future for PendingRequest { .expect("valid request parts"); *req.headers_mut() = headers.clone(); std::mem::swap(self.as_mut().headers(), &mut headers); + + self.redirects.push(RedirectStats { + finished: std::time::Instant::now(), + connection_stats: stats, + }); + ResponseFuture::Default(self.client.hyper.request(req)) } }; @@ -2725,6 +2745,12 @@ impl Future for PendingRequest { self.url.clone(), self.client.accepts, self.timeout.take(), + RequestStats { + http_stats: stats, + redirects: self.redirects.clone(), + poll_start: self.poll_start.unwrap(), + finish: std::time::Instant::now(), + }, ); return Poll::Ready(Ok(res)); } diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index d2ddfc3a1..f5f1ef79b 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use bytes::Bytes; use http_body_util::BodyExt; -use hyper::{HeaderMap, StatusCode, Version}; +use hyper::{HeaderMap, RequestStats, StatusCode, Version}; use hyper_util::client::legacy::connect::HttpInfo; #[cfg(feature = "json")] use serde::de::DeserializeOwned; @@ -30,6 +30,7 @@ pub struct Response { // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, + stats: RequestStats, } impl Response { @@ -38,6 +39,7 @@ impl Response { url: Url, accepts: Accepts, timeout: Option>>, + stats: RequestStats, ) -> Response { let (mut parts, body) = res.into_parts(); let decoder = Decoder::detect( @@ -50,9 +52,15 @@ impl Response { Response { res, url: Box::new(url), + stats, } } + /// Get the request stats for this response + pub fn stats(&self) -> &RequestStats { + &self.stats + } + /// Get the `StatusCode` of this `Response`. #[inline] pub fn status(&self) -> StatusCode { @@ -466,6 +474,7 @@ impl> From> for Response { Response { res, url: Box::new(url), + stats: RequestStats::empty(), } } } diff --git a/src/async_impl/upgrade.rs b/src/async_impl/upgrade.rs index 3b599d0ad..2e927a28d 100644 --- a/src/async_impl/upgrade.rs +++ b/src/async_impl/upgrade.rs @@ -60,7 +60,7 @@ impl fmt::Debug for Upgraded { impl From for Upgraded { fn from(inner: hyper::upgrade::Upgraded) -> Self { Upgraded { - inner: TokioIo::new(inner), + inner: TokioIo::new(inner, None), } } } diff --git a/src/connect.rs b/src/connect.rs index ff76c57f8..347735dce 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -2,7 +2,7 @@ use http::header::HeaderValue; use http::uri::{Authority, Scheme}; use http::Uri; -use hyper::rt::{Read, ReadBufCursor, Write}; +use hyper::rt::{Read, ReadBufCursor, Stats, Write}; use hyper_util::client::legacy::connect::{Connected, Connection}; #[cfg(any(feature = "socks", feature = "__tls"))] use hyper_util::rt::TokioIo; @@ -393,12 +393,14 @@ impl Connector { ) .await?; let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); - let io = tls_connector - .connect(host.ok_or("no host in url")?, TokioIo::new(tunneled)) + let mut io = tls_connector + .connect(host.ok_or("no host in url")?, TokioIo::new(tunneled, None)) .await?; + let stats = io.get_mut().get_mut().get_mut().stats(); + return Ok(Conn { inner: self.verbose.wrap(NativeTlsConn { - inner: TokioIo::new(io), + inner: TokioIo::new(io, stats), }), is_proxy: false, tls_info: false, @@ -612,11 +614,11 @@ impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream AsyncConn for T {} +impl AsyncConn for T {} #[cfg(feature = "__tls")] trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {} @@ -662,6 +664,12 @@ impl Connection for Conn { } } +impl Stats for Conn { + fn stats(&mut self) -> Option { + self.inner.stats() + } +} + impl Read for Conn { fn poll_read( self: Pin<&mut Self>, @@ -749,7 +757,7 @@ where // headers end buf.extend_from_slice(b"\r\n"); - let mut tokio_conn = TokioIo::new(&mut conn); + let mut tokio_conn = TokioIo::new(&mut conn, None); tokio_conn.write_all(&buf).await?; @@ -789,7 +797,7 @@ fn tunnel_eof() -> BoxError { #[cfg(feature = "default-tls")] mod native_tls_conn { use super::TlsInfoFactory; - use hyper::rt::{Read, ReadBufCursor, Write}; + use hyper::rt::{Read, ReadBufCursor, Stats, Write}; use hyper_tls::MaybeHttpsStream; use hyper_util::client::legacy::connect::{Connected, Connection}; use hyper_util::rt::TokioIo; @@ -849,6 +857,12 @@ mod native_tls_conn { } } + impl Stats for NativeTlsConn { + fn stats(&mut self) -> Option { + self.inner.stats() + } + } + impl Read for NativeTlsConn { fn poll_read( self: Pin<&mut Self>, @@ -1090,7 +1104,7 @@ mod socks { } mod verbose { - use hyper::rt::{Read, ReadBufCursor, Write}; + use hyper::rt::{Read, ReadBufCursor, Stats, Write}; use hyper_util::client::legacy::connect::{Connected, Connection}; use std::cmp::min; use std::fmt; @@ -1128,6 +1142,12 @@ mod verbose { } } + impl Stats for Verbose { + fn stats(&mut self) -> Option { + unimplemented!("Verbose connector needs an implementation."); + } + } + impl Read for Verbose { fn poll_read( mut self: Pin<&mut Self>, @@ -1327,7 +1347,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1345,7 +1365,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1363,7 +1383,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1387,7 +1407,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1409,7 +1429,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); let host = addr.ip().to_string(); let port = addr.port(); tunnel( diff --git a/tests/support/delay_server.rs b/tests/support/delay_server.rs index f79c2a4df..47b702066 100644 --- a/tests/support/delay_server.rs +++ b/tests/support/delay_server.rs @@ -63,8 +63,7 @@ impl Server { } res = tcp_listener.accept() => { let (stream, _) = res.unwrap(); - let io = hyper_util::rt::TokioIo::new(stream); - + let io = hyper_util::rt::TokioIo::new(stream, None); let handle = tokio::spawn({ let connection_shutdown_rx = connection_shutdown_rx.clone(); diff --git a/tests/support/server.rs b/tests/support/server.rs index f9c45b4d2..4d00561a6 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -92,7 +92,7 @@ where }); let builder = builder.clone(); tokio::spawn(async move { - let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await; + let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io, None), svc).await; }); } } diff --git a/tests/upgrade.rs b/tests/upgrade.rs index 5ea72acc2..fba75f700 100644 --- a/tests/upgrade.rs +++ b/tests/upgrade.rs @@ -11,7 +11,8 @@ async fn http_upgrade() { assert_eq!(req.headers()["upgrade"], "foobar"); tokio::spawn(async move { - let mut upgraded = hyper_util::rt::TokioIo::new(hyper::upgrade::on(req).await.unwrap()); + let mut upgraded = + hyper_util::rt::TokioIo::new(hyper::upgrade::on(req).await.unwrap(), None); let mut buf = vec![0; 7]; upgraded.read_exact(&mut buf).await.unwrap(); From 4e2eff129c87ee0b54e665bd294cf4b1a221d3de Mon Sep 17 00:00:00 2001 From: klochek Date: Tue, 22 Jul 2025 12:13:29 -0400 Subject: [PATCH 14/19] feat(uptime): add start timestamps and redirect uris to stats (#4) --- Cargo.toml | 10 +++++----- src/async_impl/client.rs | 24 +++++++++++++++++------- src/dns/hickory.rs | 13 ++++++++----- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d87dc5df..eb1368005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,8 +112,8 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { git = "https://github.com/getsentry/hyper", rev="f63c131545499b0c32eb1315224ce2823d8d5fda", features = ["http1", "client"] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "577f70dceb65e8e9dff3dc8c19e79ce877a15cbc", features = ["http1", "client", "client-legacy", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev="11a91db433282b4057795e71607f4077d9cd4f18", features = ["http1", "client"] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "fd71b24115c0c190249a5efa0889967382ec66cd", features = ["http1", "client", "client-legacy", "tokio"] } h2 = { version = "0.4", optional = true } once_cell = "1" log = "0.4" @@ -127,7 +127,7 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "ece41a68e5605138d1b77f99284e104efa698892", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "46b6233904d18b774589440929520eaa9831db62", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } @@ -162,8 +162,8 @@ futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { git = "https://github.com/getsentry/hyper", rev="f63c131545499b0c32eb1315224ce2823d8d5fda", default-features = false, features = ["http1", "http2", "client", "server"] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "577f70dceb65e8e9dff3dc8c19e79ce877a15cbc", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev="11a91db433282b4057795e71607f4077d9cd4f18", default-features = false, features = ["http1", "http2", "client", "server"] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "fd71b24115c0c190249a5efa0889967382ec66cd", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } serde = { version = "1.0", features = ["derive"] } libflate = "1.0" brotli_crate = { package = "brotli", version = "3.3.0" } diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index ec073e4ce..44226a4a1 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -2,7 +2,7 @@ use std::any::Any; use std::net::IpAddr; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant, SystemTime}; use std::{collections::HashMap, convert::TryInto, net::SocketAddr}; use std::{fmt, str}; @@ -2107,6 +2107,7 @@ impl Client { timeout, poll_start: None, + poll_start_timestamp: None, redirects: vec![], }), } @@ -2384,6 +2385,7 @@ pin_project! { timeout: Option>>, poll_start: Option, + poll_start_timestamp: Option, redirects: Vec, } } @@ -2547,6 +2549,12 @@ impl Future for PendingRequest { if self.poll_start.is_none() { self.poll_start = Some(std::time::Instant::now()); + self.poll_start_timestamp = Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_millis(), + ); } loop { @@ -2722,6 +2730,7 @@ impl Future for PendingRequest { self.redirects.push(RedirectStats { finished: std::time::Instant::now(), connection_stats: stats, + url: uri.clone(), }); ResponseFuture::Default(self.client.hyper.request(req)) @@ -2745,12 +2754,13 @@ impl Future for PendingRequest { self.url.clone(), self.client.accepts, self.timeout.take(), - RequestStats { - http_stats: stats, - redirects: self.redirects.clone(), - poll_start: self.poll_start.unwrap(), - finish: std::time::Instant::now(), - }, + RequestStats::new( + stats, + self.redirects.clone(), + self.poll_start.unwrap(), + self.poll_start_timestamp.unwrap(), + Instant::now(), + ), ); return Poll::Ready(Ok(res)); } diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index 929a9816c..aee820847 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -1,8 +1,7 @@ use hickory_resolver::{ config::{ResolverConfig, ResolverOpts}, - lookup_ip::LookupIpIntoIter, - TokioAsyncResolver, - system_conf, + lookup_ip::LookupIpIntoIter, + system_conf, TokioAsyncResolver, }; use once_cell::sync::OnceCell; @@ -48,7 +47,9 @@ impl Resolve for HickoryDnsResolver { let resolver = self.clone(); Box::pin(async move { let filter = resolver.filter; - let resolver = resolver.state.get_or_try_init(|| new_resolver(resolver.config))?; + let resolver = resolver + .state + .get_or_try_init(|| new_resolver(resolver.config))?; let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; @@ -87,7 +88,9 @@ impl Iterator for SocketAddrs { } } -fn new_resolver(resolver_config: Option<(ResolverConfig, ResolverOpts)>) -> io::Result { +fn new_resolver( + resolver_config: Option<(ResolverConfig, ResolverOpts)>, +) -> io::Result { let (config, mut opts) = match resolver_config { Some((config, opts)) => (config, opts), None => system_conf::read_system_conf().map_err(|e| { From 847e4f460fde419f5b70e0d6e15c4da22159e73e Mon Sep 17 00:00:00 2001 From: klochek Date: Fri, 25 Jul 2025 12:14:28 -0400 Subject: [PATCH 15/19] fix(uptime): stats fixes, plus corrections to redirect stats (#5) --- Cargo.toml | 121 +++++++++++++++++++++++++++++-------- src/async_impl/body.rs | 7 +-- src/async_impl/client.rs | 51 +++++++++++++--- src/async_impl/response.rs | 10 +-- src/async_impl/upgrade.rs | 5 +- src/error.rs | 3 +- src/lib.rs | 3 +- 7 files changed, 151 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eb1368005..f380c17fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,19 +19,25 @@ rustdoc-args = ["--cfg", "docsrs", "--cfg", "reqwest_unstable"] targets = ["x86_64-unknown-linux-gnu", "wasm32-unknown-unknown"] [package.metadata.playground] -features = [ - "blocking", - "cookies", - "json", - "multipart", -] +features = ["blocking", "cookies", "json", "multipart"] [features] -default = ["default-tls", "charset", "http2", "macos-system-configuration", "hickory-dns"] +default = [ + "default-tls", + "charset", + "http2", + "macos-system-configuration", + "hickory-dns", +] # Note: this doesn't enable the 'native-tls' feature, which adds specific # functionality for it. -default-tls = ["dep:hyper-tls", "dep:native-tls-crate", "__tls", "dep:tokio-native-tls"] +default-tls = [ + "dep:hyper-tls", + "dep:native-tls-crate", + "__tls", + "dep:tokio-native-tls", +] http2 = ["h2", "hyper/http2", "hyper-util/http2"] @@ -45,7 +51,13 @@ rustls-tls-manual-roots = ["__rustls"] rustls-tls-webpki-roots = ["dep:webpki-roots", "__rustls"] rustls-tls-native-roots = ["dep:rustls-native-certs", "__rustls"] -blocking = ["futures-channel/sink", "futures-util/io", "futures-util/sink", "tokio/rt-multi-thread", "tokio/sync"] +blocking = [ + "futures-channel/sink", + "futures-util/io", + "futures-util/sink", + "tokio/rt-multi-thread", + "tokio/sync", +] charset = ["dep:encoding_rs"] @@ -53,7 +65,11 @@ cookies = ["dep:cookie_crate", "dep:cookie_store"] gzip = ["dep:async-compression", "async-compression?/gzip", "dep:tokio-util"] -brotli = ["dep:async-compression", "async-compression?/brotli", "dep:tokio-util"] +brotli = [ + "dep:async-compression", + "async-compression?/brotli", + "dep:tokio-util", +] deflate = ["dep:async-compression", "async-compression?/zlib", "dep:tokio-util"] @@ -74,7 +90,13 @@ macos-system-configuration = ["dep:system-configuration"] # Experimental HTTP/3 client. # Disabled while waiting for quinn to upgrade. - http3 = ["rustls-tls-manual-roots", "dep:h3", "dep:h3-quinn", "dep:quinn", "dep:futures-channel"] +http3 = [ + "rustls-tls-manual-roots", + "dep:h3", + "dep:h3-quinn", + "dep:quinn", + "dep:futures-channel", +] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at anytime. @@ -84,7 +106,14 @@ __tls = ["dep:rustls-pemfile", "tokio/io-util"] # Enables common rustls code. # Equivalent to rustls-tls-manual-roots but shorter :) -__rustls = ["dep:hyper-rustls", "dep:tokio-rustls", "dep:rustls", "__tls", "dep:rustls-pemfile", "rustls-pki-types"] +__rustls = [ + "dep:hyper-rustls", + "dep:tokio-rustls", + "dep:rustls", + "__tls", + "dep:rustls-pemfile", + "rustls-pki-types", +] # When enabled, disable using the cached SYS_PROXIES. __internal_proxy_sys_no_cache = [] @@ -112,14 +141,25 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { git = "https://github.com/getsentry/hyper", rev="11a91db433282b4057795e71607f4077d9cd4f18", features = ["http1", "client"] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "fd71b24115c0c190249a5efa0889967382ec66cd", features = ["http1", "client", "client-legacy", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev = "33f3dc16d0e6c926a47194ed7249540d4fadadc3", features = [ + "http1", + "client", +] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "501d83ceabad31b1e2b5a29fb398f83b25ef7e66", features = [ + "http1", + "client", + "client-legacy", + "tokio", +] } h2 = { version = "0.4", optional = true } once_cell = "1" log = "0.4" mime = "0.3.16" percent-encoding = "2.1" -tokio = { version = "1.0", default-features = false, features = ["net", "time"] } +tokio = { version = "1.0", default-features = false, features = [ + "net", + "time", +] } pin-project-lite = "0.2.0" ipnet = "2.3" @@ -127,14 +167,14 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "46b6233904d18b774589440929520eaa9831db62", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "6bad33f5e4046eea3e1938ebffef21709162da5e", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } # rustls-tls hyper-rustls = { version = "0.26.0", default-features = false, optional = true } rustls = { version = "0.22.2", optional = true } -rustls-pki-types = { version = "1.1.0", features = ["alloc"] ,optional = true } +rustls-pki-types = { version = "1.1.0", features = ["alloc"], optional = true } tokio-rustls = { version = "0.25", optional = true } webpki-roots = { version = "0.26.0", optional = true } rustls-native-certs = { version = "0.7", optional = true } @@ -144,32 +184,61 @@ cookie_crate = { version = "0.17.0", package = "cookie", optional = true } cookie_store = { version = "0.20.0", optional = true } ## compression -async-compression = { version = "0.4.0", default-features = false, features = ["tokio"], optional = true } -tokio-util = { version = "0.7.1", default-features = false, features = ["codec", "io"], optional = true } +async-compression = { version = "0.4.0", default-features = false, features = [ + "tokio", +], optional = true } +tokio-util = { version = "0.7.1", default-features = false, features = [ + "codec", + "io", +], optional = true } ## socks tokio-socks = { version = "0.5.1", optional = true } ## hickory-dns -hickory-resolver = { version = "0.24", optional = true, features = ["tokio-runtime"] } +hickory-resolver = { version = "0.24", optional = true, features = [ + "tokio-runtime", +] } # HTTP/3 experimental support h3 = { version = "0.0.4", optional = true } h3-quinn = { version = "0.0.5", optional = true } -quinn = { version = "0.10", default-features = false, features = ["tls-rustls", "ring", "runtime-tokio"], optional = true } +quinn = { version = "0.10", default-features = false, features = [ + "tls-rustls", + "ring", + "runtime-tokio", +], optional = true } futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { git = "https://github.com/getsentry/hyper", rev="11a91db433282b4057795e71607f4077d9cd4f18", default-features = false, features = ["http1", "http2", "client", "server"] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "fd71b24115c0c190249a5efa0889967382ec66cd", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] } +hyper = { git = "https://github.com/getsentry/hyper", rev = "33f3dc16d0e6c926a47194ed7249540d4fadadc3", default-features = false, features = [ + "http1", + "http2", + "client", + "server", +] } +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "501d83ceabad31b1e2b5a29fb398f83b25ef7e66", features = [ + "http1", + "http2", + "client", + "client-legacy", + "server-auto", + "tokio", +] } serde = { version = "1.0", features = ["derive"] } libflate = "1.0" brotli_crate = { package = "brotli", version = "3.3.0" } doc-comment = "0.3" -tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] } -futures-util = { version = "0.3.0", default-features = false, features = ["std", "alloc"] } +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt-multi-thread", +] } +futures-util = { version = "0.3.0", default-features = false, features = [ + "std", + "alloc", +] } [target.'cfg(windows)'.dependencies] winreg = "0.52.0" @@ -203,7 +272,7 @@ features = [ "ServiceWorkerGlobalScope", "RequestCredentials", "File", - "ReadableStream" + "ReadableStream", ] [target.'cfg(target_arch = "wasm32")'.dev-dependencies] diff --git a/src/async_impl/body.rs b/src/async_impl/body.rs index cd9658c64..ea36d635d 100644 --- a/src/async_impl/body.rs +++ b/src/async_impl/body.rs @@ -1,11 +1,10 @@ +use bytes::Bytes; +use http_body::Body as HttpBody; +use http_body_util::combinators::BoxBody; use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; - -use bytes::Bytes; -use http_body::Body as HttpBody; -use http_body_util::combinators::BoxBody; //use sync_wrapper::SyncWrapper; #[cfg(feature = "stream")] use tokio::fs::File; diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 44226a4a1..2db5a676c 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -6,6 +6,7 @@ use std::time::{Duration, Instant, SystemTime}; use std::{collections::HashMap, convert::TryInto, net::SocketAddr}; use std::{fmt, str}; +use crate::{RedirectStats, RequestStats}; use bytes::Bytes; use http::header::{ Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, @@ -13,7 +14,6 @@ use http::header::{ }; use http::uri::Scheme; use http::Uri; -use hyper::{RedirectStats, RequestStats}; use hyper_util::client::legacy::connect::HttpConnector; #[cfg(feature = "default-tls")] use native_tls_crate::TlsConnector; @@ -2553,7 +2553,7 @@ impl Future for PendingRequest { SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap_or(Duration::from_secs(0)) - .as_millis(), + .as_micros(), ); } @@ -2677,7 +2677,7 @@ impl Future for PendingRequest { loc, ))); } - + let old_url = self.url.clone(); self.url = loc; let mut headers = std::mem::replace(self.as_mut().headers(), HeaderMap::new()); @@ -2727,11 +2727,36 @@ impl Future for PendingRequest { *req.headers_mut() = headers.clone(); std::mem::swap(self.as_mut().headers(), &mut headers); - self.redirects.push(RedirectStats { - finished: std::time::Instant::now(), - connection_stats: stats, - url: uri.clone(), - }); + let request_body_size = self + .body + .as_ref() + .map(|o| o.as_ref().map(|b| b.len())) + .flatten() + .unwrap_or(0) + as u32; + let now = Instant::now(); + let poll_start = self.poll_start.unwrap(); + let poll_start_timestamp = + self.poll_start_timestamp.unwrap(); + + self.redirects.push(RedirectStats::new( + now, + poll_start, + poll_start_timestamp, + stats, + res.status().as_u16(), + try_uri(&old_url) + .expect("Uri already successfully parsed."), + request_body_size, + )); + + self.poll_start = Some(now); + self.poll_start_timestamp = Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_micros(), + ); ResponseFuture::Default(self.client.hyper.request(req)) } @@ -2749,6 +2774,13 @@ impl Future for PendingRequest { } } + let status = res.status().as_u16(); + let request_body_size = self + .body + .as_ref() + .map(|o| o.as_ref().map(|b| b.len())) + .flatten() + .unwrap_or(0) as u32; let res = Response::new( res, self.url.clone(), @@ -2760,6 +2792,9 @@ impl Future for PendingRequest { self.poll_start.unwrap(), self.poll_start_timestamp.unwrap(), Instant::now(), + try_uri(&self.url).expect("Uri already successfully parsed."), + status, + request_body_size, ), ); return Poll::Ready(Ok(res)); diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index f5f1ef79b..24fcb5e42 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -1,15 +1,15 @@ -use std::fmt; -use std::net::SocketAddr; -use std::pin::Pin; - +use crate::RequestStats; use bytes::Bytes; use http_body_util::BodyExt; -use hyper::{HeaderMap, RequestStats, StatusCode, Version}; +use hyper::{HeaderMap, StatusCode, Version}; use hyper_util::client::legacy::connect::HttpInfo; #[cfg(feature = "json")] use serde::de::DeserializeOwned; #[cfg(feature = "json")] use serde_json; +use std::fmt; +use std::net::SocketAddr; +use std::pin::Pin; use tokio::time::Sleep; use url::Url; diff --git a/src/async_impl/upgrade.rs b/src/async_impl/upgrade.rs index 2e927a28d..a48ed5b7c 100644 --- a/src/async_impl/upgrade.rs +++ b/src/async_impl/upgrade.rs @@ -1,9 +1,8 @@ +use futures_util::TryFutureExt; +use hyper_util::rt::TokioIo; use std::pin::Pin; use std::task::{self, Poll}; use std::{fmt, io}; - -use futures_util::TryFutureExt; -use hyper_util::rt::TokioIo; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; /// An upgraded HTTP connection. diff --git a/src/error.rs b/src/error.rs index 56df1ad47..d5ef968f6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,10 +1,9 @@ #![cfg_attr(target_arch = "wasm32", allow(unused))] +use crate::{StatusCode, Url}; use std::error::Error as StdError; use std::fmt; use std::io; -use crate::{StatusCode, Url}; - /// A `Result` alias where the `Err` case is `reqwest::Error`. pub type Result = std::result::Result; diff --git a/src/lib.rs b/src/lib.rs index 1a69ed1eb..41b9b9140 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -251,12 +251,13 @@ macro_rules! if_hyper { $item )*} } - pub use http::header; pub use http::Method; pub use http::{StatusCode, Version}; pub use url::Url; +pub use hyper::stats::{RedirectStats, RequestStats}; + // universal mods #[macro_use] mod error; From 220c544bf929d602294643c9c4748c81ac06159a Mon Sep 17 00:00:00 2001 From: klochek Date: Wed, 6 Aug 2025 15:48:34 -0400 Subject: [PATCH 16/19] feat(uptime): record certificate bytes in redirect stats (#6) --- Cargo.toml | 10 +++++----- src/async_impl/client.rs | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f380c17fd..c59dbba67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,11 +141,11 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { git = "https://github.com/getsentry/hyper", rev = "33f3dc16d0e6c926a47194ed7249540d4fadadc3", features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "71f79506ec5b97c756f7c35606cfc862dfee7868", features = [ "http1", "client", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "501d83ceabad31b1e2b5a29fb398f83b25ef7e66", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "288134da888b5510f78c9a86471787824407c6a5", features = [ "http1", "client", "client-legacy", @@ -167,7 +167,7 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "6bad33f5e4046eea3e1938ebffef21709162da5e", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "ec67eabaae5374e7b1118eb1ed011b1ce05b5984", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } @@ -213,13 +213,13 @@ futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { git = "https://github.com/getsentry/hyper", rev = "33f3dc16d0e6c926a47194ed7249540d4fadadc3", default-features = false, features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "71f79506ec5b97c756f7c35606cfc862dfee7868", default-features = false, features = [ "http1", "http2", "client", "server", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "501d83ceabad31b1e2b5a29fb398f83b25ef7e66", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "288134da888b5510f78c9a86471787824407c6a5", features = [ "http1", "http2", "client", diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 2db5a676c..2c4f861f2 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -6,6 +6,7 @@ use std::time::{Duration, Instant, SystemTime}; use std::{collections::HashMap, convert::TryInto, net::SocketAddr}; use std::{fmt, str}; +use crate::tls::TlsInfo; use crate::{RedirectStats, RequestStats}; use bytes::Bytes; use http::header::{ @@ -2739,6 +2740,12 @@ impl Future for PendingRequest { let poll_start_timestamp = self.poll_start_timestamp.unwrap(); + let certificate = res + .extensions() + .get::() + .and_then(|info| info.peer_certificate()) + .and_then(|bytes| Some(bytes.to_vec())); + self.redirects.push(RedirectStats::new( now, poll_start, @@ -2748,6 +2755,7 @@ impl Future for PendingRequest { try_uri(&old_url) .expect("Uri already successfully parsed."), request_body_size, + certificate, )); self.poll_start = Some(now); @@ -2774,6 +2782,11 @@ impl Future for PendingRequest { } } + let certificate = res + .extensions() + .get::() + .and_then(|info| info.peer_certificate()) + .and_then(|bytes| Some(bytes.to_vec())); let status = res.status().as_u16(); let request_body_size = self .body @@ -2795,6 +2808,7 @@ impl Future for PendingRequest { try_uri(&self.url).expect("Uri already successfully parsed."), status, request_body_size, + certificate, ), ); return Poll::Ready(Ok(res)); From 4e4b644fc38b62ed2e01060a44344dc660aa8f6d Mon Sep 17 00:00:00 2001 From: klochek Date: Wed, 13 Aug 2025 13:39:54 -0400 Subject: [PATCH 17/19] chore(uptime): bump hyper deps (#7) --- Cargo.toml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c59dbba67..10c02eaee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,11 +141,11 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { git = "https://github.com/getsentry/hyper", rev = "71f79506ec5b97c756f7c35606cfc862dfee7868", features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "ea400d80174114b12cf5078462e1cdd75d1627a8", features = [ "http1", "client", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "288134da888b5510f78c9a86471787824407c6a5", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "335700c7180dc392b674bc93ac8939d44138f2fc", features = [ "http1", "client", "client-legacy", @@ -167,7 +167,7 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "ec67eabaae5374e7b1118eb1ed011b1ce05b5984", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "b3a21d25b6b3a0427ffc12db113f3a03a33b367c", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } @@ -213,13 +213,13 @@ futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { git = "https://github.com/getsentry/hyper", rev = "71f79506ec5b97c756f7c35606cfc862dfee7868", default-features = false, features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "ea400d80174114b12cf5078462e1cdd75d1627a8", default-features = false, features = [ "http1", "http2", "client", "server", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "288134da888b5510f78c9a86471787824407c6a5", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "335700c7180dc392b674bc93ac8939d44138f2fc", features = [ "http1", "http2", "client", From 8bd14b043018f366e3e639cc69ec7a27762bc747 Mon Sep 17 00:00:00 2001 From: Dan Fuller Date: Thu, 13 Nov 2025 10:55:06 -0800 Subject: [PATCH 18/19] chore(uptime): Capture DNS logs in more cases, and include resolved ip address(es) in logs Capture all DNS lookups that take over a second, and all dns lookups to vector. Also adds the specific ips. This is to help us diagnose the vector problems we've been seeing. --- src/dns/hickory.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/dns/hickory.rs b/src/dns/hickory.rs index aee820847..a855ca582 100644 --- a/src/dns/hickory.rs +++ b/src/dns/hickory.rs @@ -53,11 +53,20 @@ impl Resolve for HickoryDnsResolver { let start = std::time::Instant::now(); let lookup = resolver.lookup_ip(name.as_str()).await?; - if rand::random::() < 0.01 { + let elapsed = start.elapsed(); + + let hostname = name.as_str(); + // XXX: Hack to make sure we get all dns logs for sending to vector + let is_vector_uc = hostname.contains("vector-uc-pops"); + let should_log = is_vector_uc || elapsed.as_secs() >= 1 || rand::random::() < 0.01; + + if should_log { + let resolved_ips: Vec = lookup.iter().collect(); log::warn!( - "DNS lookup for {} took {:?}", - name.as_str(), - start.elapsed() + "DNS lookup for {} took {:?} → {:?}", + hostname, + elapsed, + resolved_ips ); } From a4846d12afbf0d01598300c46055202a8e47de95 Mon Sep 17 00:00:00 2001 From: klochek Date: Mon, 8 Dec 2025 15:12:47 -0500 Subject: [PATCH 19/19] feat(uptime): Add hashmap-based stats (#8) * feat(uptime): add hashmap-based stats collection * chore(uptime): remove old stats work * Smart pointer changes * update hyper revs * fix a call --- Cargo.toml | 10 ++-- src/async_impl/client.rs | 110 ++++++++++++++++++---------------- src/async_impl/request.rs | 11 ++++ src/async_impl/response.rs | 10 ---- src/async_impl/upgrade.rs | 2 +- src/connect.rs | 72 ++++++++++------------ tests/stats.rs | 110 ++++++++++++++++++++++++++++++++++ tests/support/delay_server.rs | 2 +- tests/support/server.rs | 2 +- tests/upgrade.rs | 3 +- 10 files changed, 218 insertions(+), 114 deletions(-) create mode 100644 tests/stats.rs diff --git a/Cargo.toml b/Cargo.toml index 10c02eaee..28e3a9c77 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -141,11 +141,11 @@ mime_guess = { version = "2.0", default-features = false, optional = true } encoding_rs = { version = "0.8", optional = true } http-body = "1" http-body-util = "0.1" -hyper = { git = "https://github.com/getsentry/hyper", rev = "ea400d80174114b12cf5078462e1cdd75d1627a8", features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "9efa5aedce8a36e9a86115b8c104da44fcc43d7e", features = [ "http1", "client", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "335700c7180dc392b674bc93ac8939d44138f2fc", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "cff84af9d682add40b92c5a57ad43f73bb714081", features = [ "http1", "client", "client-legacy", @@ -167,7 +167,7 @@ ipnet = "2.3" rustls-pemfile = { version = "2", optional = true } ## default-tls -hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "b3a21d25b6b3a0427ffc12db113f3a03a33b367c", optional = true } +hyper-tls = { git = "https://github.com/getsentry/hyper-tls", rev = "7968d40a8b00842803adae4f65963bbf0b126dca", optional = true } native-tls-crate = { version = "0.2.10", optional = true, package = "native-tls" } tokio-native-tls = { version = "0.3.0", optional = true } @@ -213,13 +213,13 @@ futures-channel = { version = "0.3", optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies] env_logger = "0.10" -hyper = { git = "https://github.com/getsentry/hyper", rev = "ea400d80174114b12cf5078462e1cdd75d1627a8", default-features = false, features = [ +hyper = { git = "https://github.com/getsentry/hyper", rev = "9efa5aedce8a36e9a86115b8c104da44fcc43d7e", default-features = false, features = [ "http1", "http2", "client", "server", ] } -hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "335700c7180dc392b674bc93ac8939d44138f2fc", features = [ +hyper-util = { git = "https://github.com/getsentry/hyper-util", rev = "cff84af9d682add40b92c5a57ad43f73bb714081", features = [ "http1", "http2", "client", diff --git a/src/async_impl/client.rs b/src/async_impl/client.rs index 2c4f861f2..213de6f67 100644 --- a/src/async_impl/client.rs +++ b/src/async_impl/client.rs @@ -7,7 +7,6 @@ use std::{collections::HashMap, convert::TryInto, net::SocketAddr}; use std::{fmt, str}; use crate::tls::TlsInfo; -use crate::{RedirectStats, RequestStats}; use bytes::Bytes; use http::header::{ Entry, HeaderMap, HeaderValue, ACCEPT, ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, @@ -15,6 +14,7 @@ use http::header::{ }; use http::uri::Scheme; use http::Uri; +use hyper::stats::RequestId; use hyper_util::client::legacy::connect::HttpConnector; #[cfg(feature = "default-tls")] use native_tls_crate::TlsConnector; @@ -2011,7 +2011,7 @@ impl Client { } pub(super) fn execute_request(&self, req: Request) -> Pending { - let (method, url, mut headers, body, timeout, version) = req.pieces(); + let (method, url, mut headers, body, timeout, version, req_id) = req.pieces(); if url.scheme() != "http" && url.scheme() != "https" { return Pending::new_err(error::url_bad_scheme(url)); } @@ -2082,7 +2082,7 @@ impl Client { _ => { let mut req = builder.body(body).expect("valid request parts"); *req.headers_mut() = headers.clone(); - ResponseFuture::Default(self.inner.hyper.request(req)) + ResponseFuture::Default(self.inner.hyper.request(req, req_id.clone())) } }; @@ -2097,6 +2097,7 @@ impl Client { url, headers, body: reusable, + req_id, urls: Vec::new(), @@ -2109,7 +2110,6 @@ impl Client { poll_start: None, poll_start_timestamp: None, - redirects: vec![], }), } } @@ -2387,7 +2387,7 @@ pin_project! { poll_start: Option, poll_start_timestamp: Option, - redirects: Vec, + req_id: RequestId, } } @@ -2466,7 +2466,12 @@ impl PendingRequest { .body(body) .expect("valid request parts"); *req.headers_mut() = self.headers.clone(); - ResponseFuture::Default(self.client.hyper.request(req)) + // TODO klochek If we ever implement retries, this is where we generate the new id. + ResponseFuture::Default( + self.client + .hyper + .request(req, hyper::stats::next_request_id()), + ) } }; @@ -2548,18 +2553,21 @@ impl Future for PendingRequest { } } - if self.poll_start.is_none() { - self.poll_start = Some(std::time::Instant::now()); - self.poll_start_timestamp = Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or(Duration::from_secs(0)) - .as_micros(), - ); - } - loop { - let (stats, res) = match self.as_mut().in_flight().get_mut() { + if self.poll_start.is_none() { + self.poll_start = Some(std::time::Instant::now()); + self.poll_start_timestamp = Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or(Duration::from_secs(0)) + .as_micros(), + ); + + hyper::stats::get_request_stats(&self.req_id) + .set_poll_start(self.poll_start.unwrap(), self.poll_start_timestamp.unwrap()); + } + + let res = match self.as_mut().in_flight().get_mut() { ResponseFuture::Default(r) => match Pin::new(r).poll(cx) { Poll::Ready(Err(e)) => { #[cfg(feature = "http2")] @@ -2736,9 +2744,6 @@ impl Future for PendingRequest { .unwrap_or(0) as u32; let now = Instant::now(); - let poll_start = self.poll_start.unwrap(); - let poll_start_timestamp = - self.poll_start_timestamp.unwrap(); let certificate = res .extensions() @@ -2746,27 +2751,26 @@ impl Future for PendingRequest { .and_then(|info| info.peer_certificate()) .and_then(|bytes| Some(bytes.to_vec())); - self.redirects.push(RedirectStats::new( - now, - poll_start, - poll_start_timestamp, - stats, - res.status().as_u16(), - try_uri(&old_url) - .expect("Uri already successfully parsed."), - request_body_size, - certificate, - )); - - self.poll_start = Some(now); - self.poll_start_timestamp = Some( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_micros(), - ); - - ResponseFuture::Default(self.client.hyper.request(req)) + let next_req_id = hyper::stats::next_request_id(); + + hyper::stats::get_request_stats(&self.req_id) + .set_redirect(next_req_id.clone()) + .set_finished(now) + .set_status_code(res.status().as_u16()) + .set_url( + try_uri(&old_url) + .expect("Uri already successfully parsed."), + ) + .set_request_body_size(request_body_size) + .set_certificate(certificate.clone()); + + self.req_id = next_req_id.clone(); + self.poll_start = None; + self.poll_start_timestamp = None; + + ResponseFuture::Default( + self.client.hyper.request(req, next_req_id), + ) } }; @@ -2782,6 +2786,8 @@ impl Future for PendingRequest { } } + let finish = Instant::now(); + let certificate = res .extensions() .get::() @@ -2794,22 +2800,22 @@ impl Future for PendingRequest { .map(|o| o.as_ref().map(|b| b.len())) .flatten() .unwrap_or(0) as u32; + + let mut req_stats = hyper::stats::get_request_stats(&self.req_id); + + req_stats + .set_poll_start(self.poll_start.unwrap(), self.poll_start_timestamp.unwrap()) + .set_finished(finish) + .set_status_code(status) + .set_url(try_uri(&self.url).expect("Uri already successfully parsed.")) + .set_request_body_size(request_body_size) + .set_certificate(certificate.clone()); + let res = Response::new( res, self.url.clone(), self.client.accepts, self.timeout.take(), - RequestStats::new( - stats, - self.redirects.clone(), - self.poll_start.unwrap(), - self.poll_start_timestamp.unwrap(), - Instant::now(), - try_uri(&self.url).expect("Uri already successfully parsed."), - status, - request_body_size, - certificate, - ), ); return Poll::Ready(Ok(res)); } diff --git a/src/async_impl/request.rs b/src/async_impl/request.rs index 665710430..350f739a9 100644 --- a/src/async_impl/request.rs +++ b/src/async_impl/request.rs @@ -3,6 +3,7 @@ use std::fmt; use std::future::Future; use std::time::Duration; +use hyper::stats::RequestId; use serde::Serialize; #[cfg(feature = "json")] use serde_json; @@ -26,6 +27,7 @@ pub struct Request { body: Option, timeout: Option, version: Version, + req_id: RequestId, } /// A builder to construct the properties of a `Request`. @@ -48,6 +50,7 @@ impl Request { body: None, timeout: None, version: Version::default(), + req_id: hyper::stats::next_request_id(), } } @@ -123,6 +126,11 @@ impl Request { &mut self.version } + /// Gets the unique request id for this request. + pub fn req_id(&self) -> &RequestId { + &self.req_id + } + /// Attempt to clone the request. /// /// `None` is returned if the request can not be cloned, i.e. if the body is a stream. @@ -148,6 +156,7 @@ impl Request { Option, Option, Version, + RequestId, ) { ( self.method, @@ -156,6 +165,7 @@ impl Request { self.body, self.timeout, self.version, + self.req_id, ) } } @@ -618,6 +628,7 @@ where body: Some(body.into()), timeout: None, version, + req_id: hyper::stats::next_request_id(), }) } } diff --git a/src/async_impl/response.rs b/src/async_impl/response.rs index 24fcb5e42..41028ecf0 100644 --- a/src/async_impl/response.rs +++ b/src/async_impl/response.rs @@ -1,4 +1,3 @@ -use crate::RequestStats; use bytes::Bytes; use http_body_util::BodyExt; use hyper::{HeaderMap, StatusCode, Version}; @@ -30,7 +29,6 @@ pub struct Response { // Boxed to save space (11 words to 1 word), and it's not accessed // frequently internally. url: Box, - stats: RequestStats, } impl Response { @@ -39,7 +37,6 @@ impl Response { url: Url, accepts: Accepts, timeout: Option>>, - stats: RequestStats, ) -> Response { let (mut parts, body) = res.into_parts(); let decoder = Decoder::detect( @@ -52,15 +49,9 @@ impl Response { Response { res, url: Box::new(url), - stats, } } - /// Get the request stats for this response - pub fn stats(&self) -> &RequestStats { - &self.stats - } - /// Get the `StatusCode` of this `Response`. #[inline] pub fn status(&self) -> StatusCode { @@ -474,7 +465,6 @@ impl> From> for Response { Response { res, url: Box::new(url), - stats: RequestStats::empty(), } } } diff --git a/src/async_impl/upgrade.rs b/src/async_impl/upgrade.rs index a48ed5b7c..64de450ce 100644 --- a/src/async_impl/upgrade.rs +++ b/src/async_impl/upgrade.rs @@ -59,7 +59,7 @@ impl fmt::Debug for Upgraded { impl From for Upgraded { fn from(inner: hyper::upgrade::Upgraded) -> Self { Upgraded { - inner: TokioIo::new(inner, None), + inner: TokioIo::new(inner), } } } diff --git a/src/connect.rs b/src/connect.rs index 347735dce..6c784e38c 100644 --- a/src/connect.rs +++ b/src/connect.rs @@ -2,7 +2,8 @@ use http::header::HeaderValue; use http::uri::{Authority, Scheme}; use http::Uri; -use hyper::rt::{Read, ReadBufCursor, Stats, Write}; +use hyper::rt::{Read, ReadBufCursor, Write}; +use hyper::stats::RequestId; use hyper_util::client::legacy::connect::{Connected, Connection}; #[cfg(any(feature = "socks", feature = "__tls"))] use hyper_util::rt::TokioIo; @@ -271,11 +272,16 @@ impl Connector { }) } - async fn connect_with_maybe_proxy(self, dst: Uri, is_proxy: bool) -> Result { + async fn connect_with_maybe_proxy( + self, + dst: Uri, + req_id: RequestId, + is_proxy: bool, + ) -> Result { match self.inner { #[cfg(not(feature = "__tls"))] Inner::Http(mut http) => { - let io = http.call(dst).await?; + let io = http.call((dst, req_id)).await?; Ok(Conn { inner: self.verbose.wrap(io), is_proxy, @@ -295,7 +301,7 @@ impl Connector { let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); let mut http = hyper_tls::HttpsConnector::from((http, tls_connector)); - let io = http.call(dst).await?; + let io = http.call((dst, req_id)).await?; if let hyper_tls::MaybeHttpsStream::Https(stream) = io { if !self.nodelay { @@ -359,6 +365,7 @@ impl Connector { async fn connect_via_proxy( self, dst: Uri, + req_id: RequestId, proxy_scheme: ProxyScheme, ) -> Result { log::debug!("proxy({proxy_scheme:?}) intercepts '{dst:?}'"); @@ -382,7 +389,7 @@ impl Connector { let http = http.clone(); let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); let mut http = hyper_tls::HttpsConnector::from((http, tls_connector)); - let conn = http.call(proxy_dst).await?; + let conn = http.call((proxy_dst, req_id)).await?; log::trace!("tunneling HTTPS over proxy"); let tunneled = tunnel( conn, @@ -393,14 +400,13 @@ impl Connector { ) .await?; let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone()); - let mut io = tls_connector - .connect(host.ok_or("no host in url")?, TokioIo::new(tunneled, None)) + let io = tls_connector + .connect(host.ok_or("no host in url")?, TokioIo::new(tunneled)) .await?; - let stats = io.get_mut().get_mut().get_mut().stats(); return Ok(Conn { inner: self.verbose.wrap(NativeTlsConn { - inner: TokioIo::new(io, stats), + inner: TokioIo::new(io), }), is_proxy: false, tls_info: false, @@ -446,7 +452,7 @@ impl Connector { Inner::Http(_) => (), } - self.connect_with_maybe_proxy(proxy_dst, true).await + self.connect_with_maybe_proxy(proxy_dst, req_id, true).await } pub fn set_keepalive(&mut self, dur: Option) { @@ -486,7 +492,7 @@ where } } -impl Service for Connector { +impl Service<(Uri, RequestId)> for Connector { type Response = Conn; type Error = BoxError; type Future = Connecting; @@ -495,20 +501,20 @@ impl Service for Connector { Poll::Ready(Ok(())) } - fn call(&mut self, dst: Uri) -> Self::Future { + fn call(&mut self, (dst, req_id): (Uri, RequestId)) -> Self::Future { log::debug!("starting new connection: {dst:?}"); let timeout = self.timeout; for prox in self.proxies.iter() { if let Some(proxy_scheme) = prox.intercept(&dst) { return Box::pin(with_timeout( - self.clone().connect_via_proxy(dst, proxy_scheme), + self.clone().connect_via_proxy(dst, req_id, proxy_scheme), timeout, )); } } Box::pin(with_timeout( - self.clone().connect_with_maybe_proxy(dst, false), + self.clone().connect_with_maybe_proxy(dst, req_id, false), timeout, )) } @@ -614,11 +620,11 @@ impl TlsInfoFactory for hyper_rustls::MaybeHttpsStream AsyncConn for T {} +impl AsyncConn for T {} #[cfg(feature = "__tls")] trait AsyncConnWithInfo: AsyncConn + TlsInfoFactory {} @@ -664,12 +670,6 @@ impl Connection for Conn { } } -impl Stats for Conn { - fn stats(&mut self) -> Option { - self.inner.stats() - } -} - impl Read for Conn { fn poll_read( self: Pin<&mut Self>, @@ -757,7 +757,7 @@ where // headers end buf.extend_from_slice(b"\r\n"); - let mut tokio_conn = TokioIo::new(&mut conn, None); + let mut tokio_conn = TokioIo::new(&mut conn); tokio_conn.write_all(&buf).await?; @@ -797,7 +797,7 @@ fn tunnel_eof() -> BoxError { #[cfg(feature = "default-tls")] mod native_tls_conn { use super::TlsInfoFactory; - use hyper::rt::{Read, ReadBufCursor, Stats, Write}; + use hyper::rt::{Read, ReadBufCursor, Write}; use hyper_tls::MaybeHttpsStream; use hyper_util::client::legacy::connect::{Connected, Connection}; use hyper_util::rt::TokioIo; @@ -857,12 +857,6 @@ mod native_tls_conn { } } - impl Stats for NativeTlsConn { - fn stats(&mut self) -> Option { - self.inner.stats() - } - } - impl Read for NativeTlsConn { fn poll_read( self: Pin<&mut Self>, @@ -1104,7 +1098,7 @@ mod socks { } mod verbose { - use hyper::rt::{Read, ReadBufCursor, Stats, Write}; + use hyper::rt::{Read, ReadBufCursor, Write}; use hyper_util::client::legacy::connect::{Connected, Connection}; use std::cmp::min; use std::fmt; @@ -1142,12 +1136,6 @@ mod verbose { } } - impl Stats for Verbose { - fn stats(&mut self) -> Option { - unimplemented!("Verbose connector needs an implementation."); - } - } - impl Read for Verbose { fn poll_read( mut self: Pin<&mut Self>, @@ -1347,7 +1335,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1365,7 +1353,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1383,7 +1371,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1407,7 +1395,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?); let host = addr.ip().to_string(); let port = addr.port(); tunnel(tcp, host, port, ua(), None).await @@ -1429,7 +1417,7 @@ mod tests { .build() .expect("new rt"); let f = async move { - let tcp = TokioIo::new(TcpStream::connect(&addr).await?, None); + let tcp = TokioIo::new(TcpStream::connect(&addr).await?); let host = addr.ip().to_string(); let port = addr.port(); tunnel( diff --git a/tests/stats.rs b/tests/stats.rs new file mode 100644 index 000000000..5c3583a06 --- /dev/null +++ b/tests/stats.rs @@ -0,0 +1,110 @@ +#![cfg(not(target_arch = "wasm32"))] +mod support; +use support::server; + +use std::time::Duration; + +#[tokio::test] +async fn stats_request_timeout() { + let _ = env_logger::try_init(); + + let server = server::http(move |_req| { + async { + // delay returning the response + tokio::time::sleep(Duration::from_secs(2)).await; + http::Response::default() + } + }); + + let client = reqwest::Client::builder().build().unwrap(); + + let url = format!("http://{}/slow", server.addr()); + + let req = client + .get(&url) + .timeout(Duration::from_millis(500)) + .build() + .unwrap(); + let req_id = req.req_id().clone(); + + let res = client.execute(req).await; + + let err = res.unwrap_err(); + + if cfg!(not(target_arch = "wasm32")) { + assert!(err.is_timeout() && !err.is_connect()); + } else { + assert!(err.is_timeout()); + } + + let stats = hyper::stats::consume_request_stats(req_id); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .is_some()); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .unwrap() + .get_connect() + .is_some()); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .unwrap() + .get_dns_resolve() + .is_some()); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .unwrap() + .get_tls_connect() + .is_none()); + assert!(stats.redirects()[0].get_request_sent().is_some()); + assert!(stats.redirects()[0].get_response_start().is_none()); +} + +#[cfg(not(target_arch = "wasm32"))] +#[tokio::test] +async fn stats_connect_timeout() { + let _ = env_logger::try_init(); + + let client = reqwest::Client::builder() + .connect_timeout(Duration::from_millis(100)) + .build() + .unwrap(); + + let url = "http://10.255.255.1:81/slow"; + + let req = client + .get(url) + .timeout(Duration::from_millis(1000)) + .build() + .unwrap(); + let req_id = req.req_id().clone(); + let res = client.execute(req).await; + + let err = res.unwrap_err(); + + assert!(err.is_connect() && err.is_timeout()); + + let stats = hyper::stats::consume_request_stats(req_id); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .is_some()); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .unwrap() + .get_dns_resolve() + .is_some()); + assert!(stats.redirects()[0] + .get_http_stats() + .get_connection_stats() + .unwrap() + .get_tls_connect() + .is_none()); + assert!(stats.redirects()[0].get_request_sent().is_none()); + assert!(stats.redirects()[0].get_response_start().is_none()); +} diff --git a/tests/support/delay_server.rs b/tests/support/delay_server.rs index 47b702066..e1475e863 100644 --- a/tests/support/delay_server.rs +++ b/tests/support/delay_server.rs @@ -63,7 +63,7 @@ impl Server { } res = tcp_listener.accept() => { let (stream, _) = res.unwrap(); - let io = hyper_util::rt::TokioIo::new(stream, None); + let io = hyper_util::rt::TokioIo::new(stream); let handle = tokio::spawn({ let connection_shutdown_rx = connection_shutdown_rx.clone(); diff --git a/tests/support/server.rs b/tests/support/server.rs index 4d00561a6..f9c45b4d2 100644 --- a/tests/support/server.rs +++ b/tests/support/server.rs @@ -92,7 +92,7 @@ where }); let builder = builder.clone(); tokio::spawn(async move { - let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io, None), svc).await; + let _ = builder.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(io), svc).await; }); } } diff --git a/tests/upgrade.rs b/tests/upgrade.rs index fba75f700..5ea72acc2 100644 --- a/tests/upgrade.rs +++ b/tests/upgrade.rs @@ -11,8 +11,7 @@ async fn http_upgrade() { assert_eq!(req.headers()["upgrade"], "foobar"); tokio::spawn(async move { - let mut upgraded = - hyper_util::rt::TokioIo::new(hyper::upgrade::on(req).await.unwrap(), None); + let mut upgraded = hyper_util::rt::TokioIo::new(hyper::upgrade::on(req).await.unwrap()); let mut buf = vec![0; 7]; upgraded.read_exact(&mut buf).await.unwrap();