diff --git a/crates/common/src/auction/orchestrator.rs b/crates/common/src/auction/orchestrator.rs index e4217359..0fc6ee9f 100644 --- a/crates/common/src/auction/orchestrator.rs +++ b/crates/common/src/auction/orchestrator.rs @@ -4,7 +4,7 @@ use error_stack::{Report, ResultExt}; use fastly::http::request::{select, PendingRequest}; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use crate::error::TrustedServerError; @@ -12,6 +12,16 @@ use super::config::AuctionConfig; use super::provider::AuctionProvider; use super::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus}; +/// Compute the remaining time budget from a deadline. +/// +/// Returns the number of milliseconds left before `timeout_ms` is exceeded, +/// measured from `start`. Returns `0` when the deadline has already passed. +#[inline] +fn remaining_budget_ms(start: Instant, timeout_ms: u32) -> u32 { + let elapsed = u32::try_from(start.elapsed().as_millis()).unwrap_or(u32::MAX); + timeout_ms.saturating_sub(elapsed) +} + /// Manages auction execution across multiple providers. pub struct AuctionOrchestrator { config: AuctionConfig, @@ -93,6 +103,7 @@ impl AuctionOrchestrator { request: &AuctionRequest, context: &AuctionContext<'_>, ) -> Result> { + let mediation_start = Instant::now(); let provider_responses = self.run_providers_parallel(request, context).await?; let floor_prices = self.floor_prices_by_slot(request); @@ -105,11 +116,30 @@ impl AuctionOrchestrator { mediator.provider_name() ); - // Create a context with provider responses for the mediator + // Give the mediator only the remaining time from the auction + // deadline, not the full timeout — the bidding phase already + // consumed part of it. + let remaining_ms = remaining_budget_ms(mediation_start, context.timeout_ms); + + if remaining_ms == 0 { + log::warn!( + "Auction timeout ({}ms) exhausted during bidding phase — skipping mediator", + context.timeout_ms + ); + let winning = self.select_winning_bids(&provider_responses, &floor_prices); + return Ok(OrchestrationResult { + provider_responses, + mediator_response: None, + winning_bids: winning, + total_time_ms: 0, + metadata: HashMap::new(), + }); + } + let mediator_context = AuctionContext { settings: context.settings, request: context.request, - timeout_ms: context.timeout_ms, + timeout_ms: remaining_ms, provider_responses: Some(&provider_responses), }; @@ -211,6 +241,9 @@ impl AuctionOrchestrator { provider_names.len() ); + // Track auction start time for deadline enforcement + let auction_start = Instant::now(); + // Phase 1: Launch all requests concurrently and build mapping // Maps backend_name -> (provider_name, start_time, provider) let mut backend_to_provider: HashMap = @@ -234,8 +267,26 @@ impl AuctionOrchestrator { continue; } - // Get the backend name for this provider to map responses back - let backend_name = match provider.backend_name() { + // Give each provider only the remaining time from the auction + // deadline so that its backend first_byte_timeout doesn't extend + // past the overall budget. Also respect the provider's own + // configured timeout when it is tighter than the remaining budget. + let remaining_ms = remaining_budget_ms(auction_start, context.timeout_ms); + let effective_timeout = remaining_ms.min(provider.timeout_ms()); + + if effective_timeout == 0 { + log::warn!( + "Auction timeout ({}ms) exhausted before launching '{}' — skipping", + context.timeout_ms, + provider.provider_name() + ); + continue; + } + + // Get the backend name for this provider to map responses back. + // Must be computed after effective_timeout since the timeout is + // part of the backend name. + let backend_name = match provider.backend_name(effective_timeout) { Some(name) => name, None => { log::warn!( @@ -246,14 +297,22 @@ impl AuctionOrchestrator { } }; + let provider_context = AuctionContext { + settings: context.settings, + request: context.request, + timeout_ms: effective_timeout, + provider_responses: context.provider_responses, + }; + log::info!( - "Launching bid request to: {} (backend: {})", + "Launching bid request to: {} (backend: {}, budget: {}ms)", provider.provider_name(), - backend_name + backend_name, + effective_timeout ); let start_time = Instant::now(); - match provider.request_bids(request, context) { + match provider.request_bids(request, &provider_context) { Ok(pending) => { backend_to_provider.insert( backend_name, @@ -275,12 +334,21 @@ impl AuctionOrchestrator { } } + let deadline = Duration::from_millis(u64::from(context.timeout_ms)); log::info!( - "Launched {} concurrent requests, waiting for responses using select...", - pending_requests.len() + "Launched {} concurrent requests, waiting for responses (timeout: {}ms)...", + pending_requests.len(), + context.timeout_ms ); - // Phase 2: Wait for responses using select() to process as they become ready + // Phase 2: Wait for responses using select() to process as they become ready. + // Enforce the auction deadline: after each select() returns, check + // elapsed time and drop remaining requests if the timeout is exceeded. + // + // NOTE: `select()` blocks until at least one backend responds (or its + // transport timeout fires). Hard deadline enforcement therefore depends + // on every backend's `first_byte_timeout` being set to at most the + // remaining auction budget — which Phase 1 above guarantees. let mut responses = Vec::new(); let mut remaining = pending_requests; @@ -332,6 +400,18 @@ impl AuctionOrchestrator { log::warn!("A provider request failed: {:?}", e); } } + + // Check auction deadline after processing each response. + // Remaining PendingRequests are dropped, which abandons the + // in-flight HTTP calls on the Fastly host. + if auction_start.elapsed() >= deadline && !remaining.is_empty() { + log::warn!( + "Auction timeout ({}ms) reached, dropping {} remaining request(s)", + context.timeout_ms, + remaining.len() + ); + break; + } } Ok(responses) @@ -628,14 +708,20 @@ mod tests { ); } - // TODO: Re-enable these tests after implementing mock provider support for send_async() - // Mock providers currently don't work with concurrent requests because they can't - // create PendingRequest without real backends configured in Fastly. + // TODO: Re-enable provider integration tests after implementing mock support + // for send_async(). Mock providers can't create PendingRequest without real + // Fastly backends. + // + // Untested timeout enforcement paths (require real backends): + // - Deadline check in select() loop (drops remaining requests) + // - Mediator skip when remaining_ms == 0 (bidding exhausts budget) + // - Provider skip when effective_timeout == 0 (budget exhausted before launch) + // - Provider context receives reduced timeout_ms per remaining budget // - // Options to fix: - // 1. Configure dummy backends in fastly.toml for testing - // 2. Refactor mock providers to use a different pattern - // 3. Create a test-only mock backend server + // Follow-up: introduce a thin abstraction over `select()` (e.g. a trait) + // so the deadline/drop logic can be unit-tested with mock futures instead + // of requiring real Fastly backends. An `#[ignore]` integration test + // exercising the full path via Viceroy would also catch regressions. #[tokio::test] async fn test_no_providers_configured() { @@ -679,6 +765,41 @@ mod tests { assert!(!orchestrator.is_enabled()); } + #[test] + fn remaining_budget_returns_full_timeout_immediately() { + let start = std::time::Instant::now(); + let result = super::remaining_budget_ms(start, 2000); + // Should be very close to 2000 (allow a few ms for test execution) + assert!( + result >= 1990, + "should return ~full timeout immediately, got {result}" + ); + } + + #[test] + fn remaining_budget_saturates_at_zero() { + // Create an instant in the past by sleeping briefly with a tiny timeout + let start = std::time::Instant::now(); + // Use a timeout of 0 — elapsed will always exceed it + let result = super::remaining_budget_ms(start, 0); + assert_eq!(result, 0, "should return 0 when timeout is 0"); + } + + #[test] + fn remaining_budget_decreases_over_time() { + let start = std::time::Instant::now(); + std::thread::sleep(std::time::Duration::from_millis(50)); + let result = super::remaining_budget_ms(start, 2000); + assert!( + result < 2000, + "should be less than full timeout after sleeping" + ); + assert!( + result > 1900, + "should still have most of the budget, got {result}" + ); + } + #[test] fn test_apply_floor_prices_allows_none_prices_for_encoded_bids() { // Test that bids with None prices (APS-style) pass through floor pricing diff --git a/crates/common/src/auction/provider.rs b/crates/common/src/auction/provider.rs index d6068b89..cd3fcfc3 100644 --- a/crates/common/src/auction/provider.rs +++ b/crates/common/src/auction/provider.rs @@ -60,11 +60,11 @@ pub trait AuctionProvider: Send + Sync { /// Return the backend name used by this provider for request routing. /// - /// This is used by the orchestrator to correlate responses with providers - /// when using `select()` to wait for multiple concurrent requests. - /// The backend name should match what `BackendConfig::from_url()` returns - /// for this provider's endpoint. - fn backend_name(&self) -> Option { + /// `timeout_ms` is the effective timeout that will be used when the backend + /// is registered in [`request_bids`](Self::request_bids). It must be + /// forwarded to [`BackendConfig::backend_name_for_url()`] so the predicted + /// name matches the actual registration (the timeout is part of the name). + fn backend_name(&self, _timeout_ms: u32) -> Option { None } } diff --git a/crates/common/src/backend.rs b/crates/common/src/backend.rs index 25f61688..34e6b523 100644 --- a/crates/common/src/backend.rs +++ b/crates/common/src/backend.rs @@ -33,6 +33,9 @@ fn compute_host_header(scheme: &str, host: &str, port: u16) -> String { } } +/// Default first-byte timeout for backends (15 seconds). +const DEFAULT_FIRST_BYTE_TIMEOUT: Duration = Duration::from_secs(15); + /// Configuration for creating a dynamic Fastly backend. /// /// Uses the builder pattern so that new options can be added without changing @@ -42,12 +45,14 @@ pub struct BackendConfig<'a> { host: &'a str, port: Option, certificate_check: bool, + first_byte_timeout: Duration, } impl<'a> BackendConfig<'a> { /// Create a new configuration with required fields and safe defaults. /// /// `certificate_check` defaults to `true`. + /// `first_byte_timeout` defaults to 15 seconds. #[must_use] pub fn new(scheme: &'a str, host: &'a str) -> Self { Self { @@ -55,6 +60,7 @@ impl<'a> BackendConfig<'a> { host, port: None, certificate_check: true, + first_byte_timeout: DEFAULT_FIRST_BYTE_TIMEOUT, } } @@ -73,17 +79,25 @@ impl<'a> BackendConfig<'a> { self } - /// Ensure a dynamic backend exists for this configuration and return its name. - /// - /// The backend name is derived from the scheme, host, port, and certificate - /// setting to avoid collisions. If a backend with the derived name already - /// exists, this function logs and reuses it. + /// Set the maximum time to wait for the first byte of the response. /// - /// # Errors + /// Defaults to 15 seconds. For latency-sensitive paths like auction + /// requests, callers should set a tighter timeout derived from the + /// auction deadline. + #[must_use] + pub fn first_byte_timeout(mut self, timeout: Duration) -> Self { + self.first_byte_timeout = timeout; + self + } + + /// Compute the deterministic backend name without registering anything. /// - /// Returns an error if the host is empty or if backend creation fails - /// (except for `NameInUse` which reuses the existing backend). - pub fn ensure(self) -> Result> { + /// The name encodes scheme, host, port, certificate setting, and + /// first-byte timeout so that backends with different configurations + /// never collide. Including the timeout prevents "first-registration-wins" + /// poisoning where a later request for the same origin with a tighter + /// timeout would silently inherit the original registration's value. + fn compute_name(&self) -> Result<(String, u16), Report> { if self.host.is_empty() { return Err(Report::new(TrustedServerError::Proxy { message: "missing host".to_string(), @@ -94,28 +108,46 @@ impl<'a> BackendConfig<'a> { .port .unwrap_or_else(|| default_port_for_scheme(self.scheme)); - let host_with_port = format!("{}:{}", self.host, target_port); - - // Include cert setting in name to avoid reusing a backend with different cert settings let name_base = format!("{}_{}_{}", self.scheme, self.host, target_port); let cert_suffix = if self.certificate_check { "" } else { "_nocert" }; + let timeout_ms = self.first_byte_timeout.as_millis(); let backend_name = format!( - "backend_{}{}", + "backend_{}{}_t{}", name_base.replace(['.', ':'], "_"), - cert_suffix + cert_suffix, + timeout_ms ); + Ok((backend_name, target_port)) + } + + /// Ensure a dynamic backend exists for this configuration and return its name. + /// + /// The backend name is derived from the scheme, host, port, certificate + /// setting, and `first_byte_timeout` to avoid collisions. Different + /// timeout values produce different backend registrations so that a + /// tight deadline cannot be silently widened by an earlier registration. + /// + /// # Errors + /// + /// Returns an error if the host is empty or if backend creation fails + /// (except for `NameInUse` which reuses the existing backend). + pub fn ensure(self) -> Result> { + let (backend_name, target_port) = self.compute_name()?; + + let host_with_port = format!("{}:{}", self.host, target_port); + let host_header = compute_host_header(self.scheme, self.host, target_port); // Target base is host[:port]; SSL is enabled only for https scheme let mut builder = Backend::builder(&backend_name, &host_with_port) .override_host(&host_header) .connect_timeout(Duration::from_secs(1)) - .first_byte_timeout(Duration::from_secs(15)) + .first_byte_timeout(self.first_byte_timeout) .between_bytes_timeout(Duration::from_secs(10)); if self.scheme.eq_ignore_ascii_case("https") { builder = builder.enable_ssl().sni_hostname(self.host); @@ -159,10 +191,38 @@ impl<'a> BackendConfig<'a> { } } + /// Parse an origin URL into its (scheme, host, port) components. + /// + /// Centralises URL parsing so that [`from_url`](Self::from_url), + /// [`from_url_with_first_byte_timeout`](Self::from_url_with_first_byte_timeout), + /// and [`backend_name_for_url`](Self::backend_name_for_url) share one + /// code-path. + fn parse_origin( + origin_url: &str, + ) -> Result<(String, String, Option), Report> { + let parsed_url = Url::parse(origin_url).change_context(TrustedServerError::Proxy { + message: format!("Invalid origin_url: {}", origin_url), + })?; + + let scheme = parsed_url.scheme().to_owned(); + let host = parsed_url + .host_str() + .ok_or_else(|| { + Report::new(TrustedServerError::Proxy { + message: "Missing host in origin_url".to_string(), + }) + })? + .to_owned(); + let port = parsed_url.port(); + + Ok((scheme, host, port)) + } + /// Parse an origin URL and ensure a dynamic backend exists for it. /// /// This is a convenience constructor that parses the URL, extracts scheme, - /// host, and port, then calls [`ensure`](Self::ensure). + /// host, and port, then calls [`ensure`](Self::ensure) with the default + /// 15 s first-byte timeout. /// /// # Errors /// @@ -172,23 +232,68 @@ impl<'a> BackendConfig<'a> { origin_url: &str, certificate_check: bool, ) -> Result> { - let parsed_url = Url::parse(origin_url).change_context(TrustedServerError::Proxy { - message: format!("Invalid origin_url: {}", origin_url), - })?; + Self::from_url_with_first_byte_timeout( + origin_url, + certificate_check, + DEFAULT_FIRST_BYTE_TIMEOUT, + ) + } - let scheme = parsed_url.scheme(); - let host = parsed_url.host_str().ok_or_else(|| { - Report::new(TrustedServerError::Proxy { - message: "Missing host in origin_url".to_string(), - }) - })?; - let port = parsed_url.port(); + /// Parse an origin URL and ensure a dynamic backend with a custom + /// first-byte timeout. + /// + /// For latency-sensitive paths (e.g. auction bid requests) callers should + /// pass the remaining auction budget so that individual requests don't hang + /// longer than the overall deadline allows. + /// + /// # Errors + /// + /// Returns an error if the URL cannot be parsed or lacks a host, or if + /// backend creation fails. + pub fn from_url_with_first_byte_timeout( + origin_url: &str, + certificate_check: bool, + first_byte_timeout: Duration, + ) -> Result> { + let (scheme, host, port) = Self::parse_origin(origin_url)?; - BackendConfig::new(scheme, host) + BackendConfig::new(&scheme, &host) .port(port) .certificate_check(certificate_check) + .first_byte_timeout(first_byte_timeout) .ensure() } + + /// Compute the backend name that + /// [`from_url_with_first_byte_timeout`](Self::from_url_with_first_byte_timeout) + /// would produce for the given URL and timeout, **without** registering a + /// backend. + /// + /// This is useful when callers need the name for mapping purposes (e.g. the + /// auction orchestrator correlating responses to providers) but want the + /// actual registration to happen later with specific settings. + /// + /// The `first_byte_timeout` must match the value that will be used at + /// registration time so that the predicted name is correct. + /// + /// # Errors + /// + /// Returns an error if the URL cannot be parsed or lacks a host. + pub fn backend_name_for_url( + origin_url: &str, + certificate_check: bool, + first_byte_timeout: Duration, + ) -> Result> { + let (scheme, host, port) = Self::parse_origin(origin_url)?; + + let (name, _) = BackendConfig::new(&scheme, &host) + .port(port) + .certificate_check(certificate_check) + .first_byte_timeout(first_byte_timeout) + .compute_name()?; + + Ok(name) + } } #[cfg(test)] @@ -242,7 +347,7 @@ mod tests { let name = BackendConfig::new("https", "origin.example.com") .ensure() .expect("should create backend for valid HTTPS origin"); - assert_eq!(name, "backend_https_origin_example_com_443"); + assert_eq!(name, "backend_https_origin_example_com_443_t15000"); } #[test] @@ -251,7 +356,7 @@ mod tests { .certificate_check(false) .ensure() .expect("should create backend with cert check disabled"); - assert_eq!(name, "backend_https_origin_example_com_443_nocert"); + assert_eq!(name, "backend_https_origin_example_com_443_nocert_t15000"); } #[test] @@ -260,11 +365,7 @@ mod tests { .port(Some(8080)) .ensure() .expect("should create backend for HTTP origin with explicit port"); - assert_eq!(name, "backend_http_api_test-site_org_8080"); - assert!( - name.ends_with("_8080"), - "should sanitize ':' to '_' in backend name" - ); + assert_eq!(name, "backend_http_api_test-site_org_8080_t15000"); } #[test] @@ -272,7 +373,7 @@ mod tests { let name = BackendConfig::new("http", "example.org") .ensure() .expect("should create backend defaulting to port 80 for HTTP"); - assert_eq!(name, "backend_http_example_org_80"); + assert_eq!(name, "backend_http_example_org_80_t15000"); } #[test] @@ -300,4 +401,30 @@ mod tests { "should return same backend name on repeat call" ); } + + #[test] + fn different_timeouts_produce_different_names() { + use std::time::Duration; + + let (name_a, _) = BackendConfig::new("https", "origin.example.com") + .first_byte_timeout(Duration::from_millis(2000)) + .compute_name() + .expect("should compute name with 2000ms timeout"); + let (name_b, _) = BackendConfig::new("https", "origin.example.com") + .first_byte_timeout(Duration::from_millis(500)) + .compute_name() + .expect("should compute name with 500ms timeout"); + assert_ne!( + name_a, name_b, + "backends with different timeouts should have different names" + ); + assert!( + name_a.ends_with("_t2000"), + "name should include timeout suffix" + ); + assert!( + name_b.ends_with("_t500"), + "name should include timeout suffix" + ); + } } diff --git a/crates/common/src/integrations/adserver_mock.rs b/crates/common/src/integrations/adserver_mock.rs index 0d5422e6..85a88dea 100644 --- a/crates/common/src/integrations/adserver_mock.rs +++ b/crates/common/src/integrations/adserver_mock.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Value as Json}; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::time::Duration; use validator::Validate; use crate::auction::context::{build_url_with_context_params, ContextQueryParams}; @@ -306,15 +307,18 @@ impl AuctionProvider for AdServerMockProvider { message: "Failed to set mediation request body".to_string(), })?; - // Send async - let backend_name = BackendConfig::from_url(&self.config.endpoint, true).change_context( - TrustedServerError::Auction { - message: format!( - "Failed to resolve backend for mediation endpoint: {}", - self.config.endpoint - ), - }, - )?; + // Send async with auction-scoped timeout + let backend_name = BackendConfig::from_url_with_first_byte_timeout( + &self.config.endpoint, + true, + Duration::from_millis(u64::from(context.timeout_ms)), + ) + .change_context(TrustedServerError::Auction { + message: format!( + "Failed to resolve backend for mediation endpoint: {}", + self.config.endpoint + ), + })?; let pending = req .send_async(backend_name) @@ -369,8 +373,13 @@ impl AuctionProvider for AdServerMockProvider { self.config.enabled } - fn backend_name(&self) -> Option { - BackendConfig::from_url(&self.config.endpoint, true).ok() + fn backend_name(&self, timeout_ms: u32) -> Option { + BackendConfig::backend_name_for_url( + &self.config.endpoint, + true, + Duration::from_millis(u64::from(timeout_ms)), + ) + .ok() } } diff --git a/crates/common/src/integrations/aps.rs b/crates/common/src/integrations/aps.rs index bdd9c25b..0994e2f1 100644 --- a/crates/common/src/integrations/aps.rs +++ b/crates/common/src/integrations/aps.rs @@ -8,6 +8,7 @@ use fastly::Request; use serde::{Deserialize, Serialize}; use serde_json::{json, Value as Json}; use std::collections::HashMap; +use std::time::Duration; use validator::Validate; use crate::auction::provider::AuctionProvider; @@ -424,7 +425,7 @@ impl AuctionProvider for ApsAuctionProvider { fn request_bids( &self, request: &AuctionRequest, - _context: &AuctionContext<'_>, + context: &AuctionContext<'_>, ) -> Result> { log::info!( "APS: requesting bids for {} slots (pub_id: {})", @@ -452,15 +453,18 @@ impl AuctionProvider for ApsAuctionProvider { message: "Failed to set APS request body".to_string(), })?; - // Send request asynchronously - let backend_name = BackendConfig::from_url(&self.config.endpoint, true).change_context( - TrustedServerError::Auction { - message: format!( - "Failed to resolve backend for APS endpoint: {}", - self.config.endpoint - ), - }, - )?; + // Send request asynchronously with auction-scoped timeout + let backend_name = BackendConfig::from_url_with_first_byte_timeout( + &self.config.endpoint, + true, + Duration::from_millis(u64::from(context.timeout_ms)), + ) + .change_context(TrustedServerError::Auction { + message: format!( + "Failed to resolve backend for APS endpoint: {}", + self.config.endpoint + ), + })?; let pending = aps_req @@ -517,8 +521,13 @@ impl AuctionProvider for ApsAuctionProvider { self.config.enabled } - fn backend_name(&self) -> Option { - BackendConfig::from_url(&self.config.endpoint, true).ok() + fn backend_name(&self, timeout_ms: u32) -> Option { + BackendConfig::backend_name_for_url( + &self.config.endpoint, + true, + Duration::from_millis(u64::from(timeout_ms)), + ) + .ok() } } diff --git a/crates/common/src/integrations/prebid.rs b/crates/common/src/integrations/prebid.rs index 89a8826c..68193c6a 100644 --- a/crates/common/src/integrations/prebid.rs +++ b/crates/common/src/integrations/prebid.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use base64::{engine::general_purpose::STANDARD as BASE64, Engine}; @@ -855,8 +856,12 @@ impl AuctionProvider for PrebidAuctionProvider { message: "Failed to set request body".to_string(), })?; - // Send request asynchronously - let backend_name = BackendConfig::from_url(&self.config.server_url, true)?; + // Send request asynchronously with auction-scoped timeout + let backend_name = BackendConfig::from_url_with_first_byte_timeout( + &self.config.server_url, + true, + Duration::from_millis(u64::from(context.timeout_ms)), + )?; let pending = pbs_req .send_async(backend_name) @@ -946,8 +951,13 @@ impl AuctionProvider for PrebidAuctionProvider { self.config.enabled } - fn backend_name(&self) -> Option { - BackendConfig::from_url(&self.config.server_url, true).ok() + fn backend_name(&self, timeout_ms: u32) -> Option { + BackendConfig::backend_name_for_url( + &self.config.server_url, + true, + Duration::from_millis(u64::from(timeout_ms)), + ) + .ok() } }