diff --git a/src/request_client.rs b/src/request_client.rs index bc519722..ce3ebbf7 100644 --- a/src/request_client.rs +++ b/src/request_client.rs @@ -4,15 +4,27 @@ use reqwest::ClientBuilder; use reqwest_middleware::{ClientBuilder as ClientWithMiddlewareBuilder, ClientWithMiddleware}; use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff}; -const UPLOAD_RETRY_COUNT: u32 = 3; +pub const UPLOAD_RETRY_COUNT: u32 = 3; const OIDC_RETRY_COUNT: u32 = 10; const USER_AGENT: &str = "codspeed-runner"; +/// Shared backoff policy for upload retries, used both by the retry middleware on +/// [`REQUEST_CLIENT`] and by the manual stream-retry loop in the uploader. Under +/// `cfg(test)` the intervals are shrunk to milliseconds so retry tests don't sleep +/// through the real exponential backoff (1s, 2s, 4s). +pub fn upload_backoff() -> ExponentialBackoff { + let builder = ExponentialBackoff::builder(); + #[cfg(test)] + let builder = builder.retry_bounds( + std::time::Duration::from_millis(1), + std::time::Duration::from_millis(5), + ); + builder.build_with_max_retries(UPLOAD_RETRY_COUNT) +} + pub static REQUEST_CLIENT: LazyLock = LazyLock::new(|| { ClientWithMiddlewareBuilder::new(ClientBuilder::new().user_agent(USER_AGENT).build().unwrap()) - .with(RetryTransientMiddleware::new_with_policy( - ExponentialBackoff::builder().build_with_max_retries(UPLOAD_RETRY_COUNT), - )) + .with(RetryTransientMiddleware::new_with_policy(upload_backoff())) .build() }); diff --git a/src/upload/uploader.rs b/src/upload/uploader.rs index 6983eadf..c24bafe3 100644 --- a/src/upload/uploader.rs +++ b/src/upload/uploader.rs @@ -6,13 +6,17 @@ use crate::run_environment::RunEnvironment; use crate::upload::{UploadError, profile_archive::ProfileArchiveContent}; use crate::{ prelude::*, - request_client::{REQUEST_CLIENT, STREAMING_CLIENT}, + request_client::{REQUEST_CLIENT, STREAMING_CLIENT, upload_backoff}, }; use async_compression::tokio::write::GzipEncoder; use console::style; use reqwest::StatusCode; +use reqwest_retry::{ + DefaultRetryableStrategy, RetryDecision, RetryPolicy, Retryable, RetryableStrategy, +}; use serde_json::Value; use std::collections::BTreeMap; +use std::time::SystemTime; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio_tar::Builder; @@ -182,6 +186,63 @@ async fn retrieve_upload_data( } } +/// The retry middleware can't replay a consumed stream, so we rebuild the body from +/// disk on each attempt. Response-level errors (4xx/5xx) are left for the caller. +async fn send_streamed_with_retry( + upload_data: &UploadData, + path: &std::path::Path, + archive_size: u64, + archive_hash: &str, + encoding: Option, +) -> Result { + let policy = upload_backoff(); + let start = SystemTime::now(); + let mut n_past_retries = 0; + + loop { + let file = File::open(path) + .await + .context(format!("Failed to open file at path: {}", path.display()))?; + let stream = tokio_util::io::ReaderStream::new(file); + let body = reqwest::Body::wrap_stream(stream); + + let mut request = STREAMING_CLIENT + .put(upload_data.upload_url.clone()) + .header("Content-Type", "application/x-tar") + .header("Content-Length", archive_size) + .header("Content-MD5", archive_hash); + if let Some(encoding) = &encoding { + request = request.header("Content-Encoding", encoding); + } + + let result = request + .body(body) + .send() + .await + .map_err(reqwest_middleware::Error::Reqwest); + + let is_transient = matches!( + DefaultRetryableStrategy.handle(&result), + Some(Retryable::Transient) + ); + if is_transient { + if let RetryDecision::Retry { execute_after } = + policy.should_retry(start, n_past_retries) + { + let wait = execute_after + .duration_since(SystemTime::now()) + .unwrap_or_default(); + debug!("Streamed upload attempt failed (transient), retrying in {wait:?}"); + tokio::time::sleep(wait).await; + n_past_retries += 1; + continue; + } + } + + return Ok(result?); + } +} + async fn upload_profile_archive( upload_data: &UploadData, profile_archive: ProfileArchive, @@ -206,24 +267,14 @@ async fn upload_profile_archive( } content @ ProfileArchiveContent::UncompressedOnDisk { path } | content @ ProfileArchiveContent::CompressedOnDisk { path } => { - // Use streaming client without retry middleware for file streams - let file = File::open(path) - .await - .context(format!("Failed to open file at path: {}", path.display()))?; - let stream = tokio_util::io::ReaderStream::new(file); - let body = reqwest::Body::wrap_stream(stream); - - let mut request = STREAMING_CLIENT - .put(upload_data.upload_url.clone()) - .header("Content-Type", "application/x-tar") - .header("Content-Length", archive_size) - .header("Content-MD5", archive_hash); - - if let Some(encoding) = content.encoding() { - request = request.header("Content-Encoding", encoding); - } - - request.body(body).send().await? + send_streamed_with_retry( + upload_data, + path, + archive_size, + &archive_hash, + content.encoding(), + ) + .await? } }; @@ -378,4 +429,103 @@ mod tests { ) .await; } + + const EXPECTED_ATTEMPTS: usize = crate::request_client::UPLOAD_RETRY_COUNT as usize + 1; + + /// Answers `503` to each of the next `max_conns` connections, then exits. Returns + /// the URL, a counter of connections received, and the server's join handle. + fn spawn_mock_returning_503( + max_conns: usize, + ) -> ( + String, + std::sync::Arc, + std::thread::JoinHandle<()>, + ) { + use std::io::{Read, Write}; + use std::net::TcpListener; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let url = format!("http://{}/upload", listener.local_addr().unwrap()); + let hits = Arc::new(AtomicUsize::new(0)); + + let hits_loop = hits.clone(); + let handle = std::thread::spawn(move || { + for stream in listener.incoming().take(max_conns) { + let Ok(mut stream) = stream else { continue }; + hits_loop.fetch_add(1, Ordering::SeqCst); + let mut buf = [0u8; 2048]; + let _ = stream.read(&mut buf); + let body = "transient"; + let resp = format!( + "HTTP/1.1 503 Service Unavailable\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ); + let _ = stream.write_all(resp.as_bytes()); + } + }); + + (url, hits, handle) + } + + fn upload_data_for(url: String) -> UploadData { + UploadData { + status: "success".to_string(), + upload_url: url, + run_id: "test-run".to_string(), + } + } + + /// On-disk archives stream through `send_streamed_with_retry`, which retries + /// transient failures itself since `STREAMING_CLIENT` has no retry middleware. + #[tokio::test] + async fn streamed_upload_is_retried() { + use std::sync::atomic::Ordering; + + let (url, hits, server) = spawn_mock_returning_503(EXPECTED_ATTEMPTS); + + let path = tempfile::NamedTempFile::new() + .unwrap() + .into_temp_path() + .keep() + .unwrap(); + std::fs::write(&path, b"profile-archive").unwrap(); + let archive = ProfileArchive::new_uncompressed_on_disk(path).unwrap(); + + let result = upload_profile_archive(&upload_data_for(url), archive).await; + server.join().unwrap(); + + assert!( + result.is_err(), + "a 503 should surface as an error after retries" + ); + assert_eq!( + hits.load(Ordering::SeqCst), + EXPECTED_ATTEMPTS, + "streamed upload should be attempted 1 + UPLOAD_RETRY_COUNT times" + ); + } + + /// In-memory archives go through `REQUEST_CLIENT`, whose retry middleware handles + /// transient failures. + #[tokio::test] + async fn in_memory_upload_is_retried() { + use std::sync::atomic::Ordering; + + let (url, hits, server) = spawn_mock_returning_503(EXPECTED_ATTEMPTS); + + let archive = ProfileArchive::new_compressed_in_memory(b"profile-archive".to_vec()); + + let result = upload_profile_archive(&upload_data_for(url), archive).await; + server.join().unwrap(); + + assert!(result.is_err(), "a 503 should surface as an error"); + assert_eq!( + hits.load(Ordering::SeqCst), + EXPECTED_ATTEMPTS, + "in-memory upload should be attempted 1 + UPLOAD_RETRY_COUNT times" + ); + } }