Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 139 additions & 18 deletions crates/common/src/auction/orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@ use error_stack::{Report, ResultExt};
use fastly::http::request::{select, PendingRequest};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use crate::error::TrustedServerError;

use super::config::AuctionConfig;
use super::provider::AuctionProvider;
use super::types::{AuctionContext, AuctionRequest, AuctionResponse, Bid, BidStatus};

/// Compute the remaining time budget from a deadline.
///
/// Returns the number of milliseconds left before `timeout_ms` is exceeded,
/// measured from `start`. Returns `0` when the deadline has already passed.
#[inline]
fn remaining_budget_ms(start: Instant, timeout_ms: u32) -> u32 {
let elapsed = u32::try_from(start.elapsed().as_millis()).unwrap_or(u32::MAX);
timeout_ms.saturating_sub(elapsed)
}

/// Manages auction execution across multiple providers.
pub struct AuctionOrchestrator {
config: AuctionConfig,
Expand Down Expand Up @@ -93,6 +103,7 @@ impl AuctionOrchestrator {
request: &AuctionRequest,
context: &AuctionContext<'_>,
) -> Result<OrchestrationResult, Report<TrustedServerError>> {
let mediation_start = Instant::now();
let provider_responses = self.run_providers_parallel(request, context).await?;

let floor_prices = self.floor_prices_by_slot(request);
Expand All @@ -105,11 +116,30 @@ impl AuctionOrchestrator {
mediator.provider_name()
);

// Create a context with provider responses for the mediator
// Give the mediator only the remaining time from the auction
// deadline, not the full timeout — the bidding phase already
// consumed part of it.
let remaining_ms = remaining_budget_ms(mediation_start, context.timeout_ms);

if remaining_ms == 0 {
log::warn!(
"Auction timeout ({}ms) exhausted during bidding phase — skipping mediator",
context.timeout_ms
);
let winning = self.select_winning_bids(&provider_responses, &floor_prices);
return Ok(OrchestrationResult {
provider_responses,
mediator_response: None,
winning_bids: winning,
total_time_ms: 0,
metadata: HashMap::new(),
});
}

let mediator_context = AuctionContext {
settings: context.settings,
request: context.request,
timeout_ms: context.timeout_ms,
timeout_ms: remaining_ms,
provider_responses: Some(&provider_responses),
};

Expand Down Expand Up @@ -211,6 +241,9 @@ impl AuctionOrchestrator {
provider_names.len()
);

// Track auction start time for deadline enforcement
let auction_start = Instant::now();

// Phase 1: Launch all requests concurrently and build mapping
// Maps backend_name -> (provider_name, start_time, provider)
let mut backend_to_provider: HashMap<String, (&str, Instant, &dyn AuctionProvider)> =
Expand All @@ -234,8 +267,26 @@ impl AuctionOrchestrator {
continue;
}

// Get the backend name for this provider to map responses back
let backend_name = match provider.backend_name() {
// Give each provider only the remaining time from the auction
// deadline so that its backend first_byte_timeout doesn't extend
// past the overall budget. Also respect the provider's own
// configured timeout when it is tighter than the remaining budget.
let remaining_ms = remaining_budget_ms(auction_start, context.timeout_ms);
let effective_timeout = remaining_ms.min(provider.timeout_ms());

if effective_timeout == 0 {
log::warn!(
"Auction timeout ({}ms) exhausted before launching '{}' — skipping",
context.timeout_ms,
provider.provider_name()
);
continue;
}

// Get the backend name for this provider to map responses back.
// Must be computed after effective_timeout since the timeout is
// part of the backend name.
let backend_name = match provider.backend_name(effective_timeout) {
Some(name) => name,
None => {
log::warn!(
Expand All @@ -246,14 +297,22 @@ impl AuctionOrchestrator {
}
};

let provider_context = AuctionContext {
settings: context.settings,
request: context.request,
timeout_ms: effective_timeout,
provider_responses: context.provider_responses,
};

log::info!(
"Launching bid request to: {} (backend: {})",
"Launching bid request to: {} (backend: {}, budget: {}ms)",
provider.provider_name(),
backend_name
backend_name,
effective_timeout
);

let start_time = Instant::now();
match provider.request_bids(request, context) {
match provider.request_bids(request, &provider_context) {
Ok(pending) => {
backend_to_provider.insert(
backend_name,
Expand All @@ -275,12 +334,21 @@ impl AuctionOrchestrator {
}
}

let deadline = Duration::from_millis(u64::from(context.timeout_ms));
log::info!(
"Launched {} concurrent requests, waiting for responses using select...",
pending_requests.len()
"Launched {} concurrent requests, waiting for responses (timeout: {}ms)...",
pending_requests.len(),
context.timeout_ms
);

// Phase 2: Wait for responses using select() to process as they become ready
// Phase 2: Wait for responses using select() to process as they become ready.
// Enforce the auction deadline: after each select() returns, check
// elapsed time and drop remaining requests if the timeout is exceeded.
//
// NOTE: `select()` blocks until at least one backend responds (or its
// transport timeout fires). Hard deadline enforcement therefore depends
// on every backend's `first_byte_timeout` being set to at most the
// remaining auction budget — which Phase 1 above guarantees.
let mut responses = Vec::new();
let mut remaining = pending_requests;

Expand Down Expand Up @@ -332,6 +400,18 @@ impl AuctionOrchestrator {
log::warn!("A provider request failed: {:?}", e);
}
}

// Check auction deadline after processing each response.
// Remaining PendingRequests are dropped, which abandons the
// in-flight HTTP calls on the Fastly host.
if auction_start.elapsed() >= deadline && !remaining.is_empty() {
log::warn!(
"Auction timeout ({}ms) reached, dropping {} remaining request(s)",
context.timeout_ms,
remaining.len()
);
break;
}
}

Ok(responses)
Expand Down Expand Up @@ -628,14 +708,20 @@ mod tests {
);
}

// TODO: Re-enable these tests after implementing mock provider support for send_async()
// Mock providers currently don't work with concurrent requests because they can't
// create PendingRequest without real backends configured in Fastly.
// TODO: Re-enable provider integration tests after implementing mock support
// for send_async(). Mock providers can't create PendingRequest without real
// Fastly backends.
//
// Untested timeout enforcement paths (require real backends):
// - Deadline check in select() loop (drops remaining requests)
// - Mediator skip when remaining_ms == 0 (bidding exhausts budget)
// - Provider skip when effective_timeout == 0 (budget exhausted before launch)
// - Provider context receives reduced timeout_ms per remaining budget
//
// Options to fix:
// 1. Configure dummy backends in fastly.toml for testing
// 2. Refactor mock providers to use a different pattern
// 3. Create a test-only mock backend server
// Follow-up: introduce a thin abstraction over `select()` (e.g. a trait)
// so the deadline/drop logic can be unit-tested with mock futures instead
// of requiring real Fastly backends. An `#[ignore]` integration test
// exercising the full path via Viceroy would also catch regressions.

#[tokio::test]
async fn test_no_providers_configured() {
Expand Down Expand Up @@ -679,6 +765,41 @@ mod tests {
assert!(!orchestrator.is_enabled());
}

#[test]
fn remaining_budget_returns_full_timeout_immediately() {
let start = std::time::Instant::now();
let result = super::remaining_budget_ms(start, 2000);
// Should be very close to 2000 (allow a few ms for test execution)
assert!(
result >= 1990,
"should return ~full timeout immediately, got {result}"
);
}

#[test]
fn remaining_budget_saturates_at_zero() {
// Create an instant in the past by sleeping briefly with a tiny timeout
let start = std::time::Instant::now();
// Use a timeout of 0 — elapsed will always exceed it
let result = super::remaining_budget_ms(start, 0);
assert_eq!(result, 0, "should return 0 when timeout is 0");
}

#[test]
fn remaining_budget_decreases_over_time() {
let start = std::time::Instant::now();
std::thread::sleep(std::time::Duration::from_millis(50));
let result = super::remaining_budget_ms(start, 2000);
assert!(
result < 2000,
"should be less than full timeout after sleeping"
);
assert!(
result > 1900,
"should still have most of the budget, got {result}"
);
}

#[test]
fn test_apply_floor_prices_allows_none_prices_for_encoded_bids() {
// Test that bids with None prices (APS-style) pass through floor pricing
Expand Down
10 changes: 5 additions & 5 deletions crates/common/src/auction/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ pub trait AuctionProvider: Send + Sync {

/// Return the backend name used by this provider for request routing.
///
/// This is used by the orchestrator to correlate responses with providers
/// when using `select()` to wait for multiple concurrent requests.
/// The backend name should match what `BackendConfig::from_url()` returns
/// for this provider's endpoint.
fn backend_name(&self) -> Option<String> {
/// `timeout_ms` is the effective timeout that will be used when the backend
/// is registered in [`request_bids`](Self::request_bids). It must be
/// forwarded to [`BackendConfig::backend_name_for_url()`] so the predicted
/// name matches the actual registration (the timeout is part of the name).
fn backend_name(&self, _timeout_ms: u32) -> Option<String> {
None
}
}
Loading
Loading