Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/request_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@ use reqwest::ClientBuilder;
use reqwest_middleware::{ClientBuilder as ClientWithMiddlewareBuilder, ClientWithMiddleware};
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};

const UPLOAD_RETRY_COUNT: u32 = 3;
pub const UPLOAD_RETRY_COUNT: u32 = 3;
const OIDC_RETRY_COUNT: u32 = 10;
const USER_AGENT: &str = "codspeed-runner";

/// Shared backoff policy for upload retries, used both by the retry middleware on
/// [`REQUEST_CLIENT`] and by the manual stream-retry loop in the uploader. Under
/// `cfg(test)` the intervals are shrunk to milliseconds so retry tests don't sleep
/// through the real exponential backoff (1s, 2s, 4s).
pub fn upload_backoff() -> ExponentialBackoff {
let builder = ExponentialBackoff::builder();
#[cfg(test)]
let builder = builder.retry_bounds(
std::time::Duration::from_millis(1),
std::time::Duration::from_millis(5),
);
builder.build_with_max_retries(UPLOAD_RETRY_COUNT)
}

pub static REQUEST_CLIENT: LazyLock<ClientWithMiddleware> = LazyLock::new(|| {
ClientWithMiddlewareBuilder::new(ClientBuilder::new().user_agent(USER_AGENT).build().unwrap())
.with(RetryTransientMiddleware::new_with_policy(
ExponentialBackoff::builder().build_with_max_retries(UPLOAD_RETRY_COUNT),
))
.with(RetryTransientMiddleware::new_with_policy(upload_backoff()))
.build()
});

Expand Down
188 changes: 169 additions & 19 deletions src/upload/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ use crate::run_environment::RunEnvironment;
use crate::upload::{UploadError, profile_archive::ProfileArchiveContent};
use crate::{
prelude::*,
request_client::{REQUEST_CLIENT, STREAMING_CLIENT},
request_client::{REQUEST_CLIENT, STREAMING_CLIENT, upload_backoff},
};
use async_compression::tokio::write::GzipEncoder;
use console::style;
use reqwest::StatusCode;
use reqwest_retry::{
DefaultRetryableStrategy, RetryDecision, RetryPolicy, Retryable, RetryableStrategy,
};
use serde_json::Value;
use std::collections::BTreeMap;
use std::time::SystemTime;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio_tar::Builder;
Expand Down Expand Up @@ -182,6 +186,63 @@ async fn retrieve_upload_data(
}
}

/// The retry middleware can't replay a consumed stream, so we rebuild the body from
/// disk on each attempt. Response-level errors (4xx/5xx) are left for the caller.
async fn send_streamed_with_retry(
upload_data: &UploadData,
path: &std::path::Path,
archive_size: u64,
archive_hash: &str,
encoding: Option<String>,
) -> Result<reqwest::Response> {
let policy = upload_backoff();
let start = SystemTime::now();
let mut n_past_retries = 0;

loop {
let file = File::open(path)
.await
.context(format!("Failed to open file at path: {}", path.display()))?;
let stream = tokio_util::io::ReaderStream::new(file);
let body = reqwest::Body::wrap_stream(stream);

let mut request = STREAMING_CLIENT
.put(upload_data.upload_url.clone())
.header("Content-Type", "application/x-tar")
.header("Content-Length", archive_size)
.header("Content-MD5", archive_hash);
if let Some(encoding) = &encoding {
request = request.header("Content-Encoding", encoding);
}

let result = request
.body(body)
.send()
.await
.map_err(reqwest_middleware::Error::Reqwest);

let is_transient = matches!(
DefaultRetryableStrategy.handle(&result),
Some(Retryable::Transient)
);
if is_transient {
if let RetryDecision::Retry { execute_after } =
policy.should_retry(start, n_past_retries)
{
let wait = execute_after
.duration_since(SystemTime::now())
.unwrap_or_default();
debug!("Streamed upload attempt failed (transient), retrying in {wait:?}");
tokio::time::sleep(wait).await;
n_past_retries += 1;
continue;
}
}
Comment thread
not-matthias marked this conversation as resolved.

return Ok(result?);
}
}

async fn upload_profile_archive(
upload_data: &UploadData,
profile_archive: ProfileArchive,
Expand All @@ -206,24 +267,14 @@ async fn upload_profile_archive(
}
content @ ProfileArchiveContent::UncompressedOnDisk { path }
| content @ ProfileArchiveContent::CompressedOnDisk { path } => {
// Use streaming client without retry middleware for file streams
let file = File::open(path)
.await
.context(format!("Failed to open file at path: {}", path.display()))?;
let stream = tokio_util::io::ReaderStream::new(file);
let body = reqwest::Body::wrap_stream(stream);

let mut request = STREAMING_CLIENT
.put(upload_data.upload_url.clone())
.header("Content-Type", "application/x-tar")
.header("Content-Length", archive_size)
.header("Content-MD5", archive_hash);

if let Some(encoding) = content.encoding() {
request = request.header("Content-Encoding", encoding);
}

request.body(body).send().await?
send_streamed_with_retry(
upload_data,
path,
archive_size,
&archive_hash,
content.encoding(),
)
.await?
}
};

Expand Down Expand Up @@ -378,4 +429,103 @@ mod tests {
)
.await;
}

const EXPECTED_ATTEMPTS: usize = crate::request_client::UPLOAD_RETRY_COUNT as usize + 1;

/// Answers `503` to each of the next `max_conns` connections, then exits. Returns
/// the URL, a counter of connections received, and the server's join handle.
fn spawn_mock_returning_503(
max_conns: usize,
) -> (
String,
std::sync::Arc<std::sync::atomic::AtomicUsize>,
std::thread::JoinHandle<()>,
) {
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let url = format!("http://{}/upload", listener.local_addr().unwrap());
let hits = Arc::new(AtomicUsize::new(0));

let hits_loop = hits.clone();
let handle = std::thread::spawn(move || {
for stream in listener.incoming().take(max_conns) {
let Ok(mut stream) = stream else { continue };
hits_loop.fetch_add(1, Ordering::SeqCst);
let mut buf = [0u8; 2048];
let _ = stream.read(&mut buf);
let body = "transient";
let resp = format!(
"HTTP/1.1 503 Service Unavailable\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
body.len(),
body
);
let _ = stream.write_all(resp.as_bytes());
}
});

(url, hits, handle)
}

fn upload_data_for(url: String) -> UploadData {
UploadData {
status: "success".to_string(),
upload_url: url,
run_id: "test-run".to_string(),
}
}

/// On-disk archives stream through `send_streamed_with_retry`, which retries
/// transient failures itself since `STREAMING_CLIENT` has no retry middleware.
#[tokio::test]
async fn streamed_upload_is_retried() {
use std::sync::atomic::Ordering;

let (url, hits, server) = spawn_mock_returning_503(EXPECTED_ATTEMPTS);

let path = tempfile::NamedTempFile::new()
.unwrap()
.into_temp_path()
.keep()
.unwrap();
std::fs::write(&path, b"profile-archive").unwrap();
let archive = ProfileArchive::new_uncompressed_on_disk(path).unwrap();

let result = upload_profile_archive(&upload_data_for(url), archive).await;
server.join().unwrap();

assert!(
result.is_err(),
"a 503 should surface as an error after retries"
);
assert_eq!(
hits.load(Ordering::SeqCst),
EXPECTED_ATTEMPTS,
"streamed upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
);
}

/// In-memory archives go through `REQUEST_CLIENT`, whose retry middleware handles
/// transient failures.
#[tokio::test]
async fn in_memory_upload_is_retried() {
use std::sync::atomic::Ordering;

let (url, hits, server) = spawn_mock_returning_503(EXPECTED_ATTEMPTS);

let archive = ProfileArchive::new_compressed_in_memory(b"profile-archive".to_vec());

let result = upload_profile_archive(&upload_data_for(url), archive).await;
server.join().unwrap();

assert!(result.is_err(), "a 503 should surface as an error");
assert_eq!(
hits.load(Ordering::SeqCst),
EXPECTED_ATTEMPTS,
"in-memory upload should be attempted 1 + UPLOAD_RETRY_COUNT times"
);
}
}