Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
368 changes: 368 additions & 0 deletions crates/cli/tests/rm_purge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,368 @@
#![cfg(not(windows))]

use std::io::{ErrorKind, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;

#[derive(Debug, Clone)]
struct CapturedRequest {
method: String,
path: String,
headers: Vec<(String, String)>,
}

impl CapturedRequest {
fn header(&self, name: &str) -> Option<&str> {
header_value(&self.headers, name)
}
}

struct TestServer {
authority: String,
requests: Arc<Mutex<Vec<CapturedRequest>>>,
stop: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}

impl TestServer {
fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind local HTTP server");
listener
.set_nonblocking(true)
.expect("set listener nonblocking");
let authority = listener
.local_addr()
.expect("local HTTP server address")
.to_string();
let requests = Arc::new(Mutex::new(Vec::new()));
let stop = Arc::new(AtomicBool::new(false));
let thread_requests = Arc::clone(&requests);
let thread_stop = Arc::clone(&stop);

let handle = thread::spawn(move || {
while !thread_stop.load(Ordering::SeqCst) {
match listener.accept() {
Ok((mut stream, _)) => {
if let Some(request) = read_request(&mut stream) {
let response = response_for(&request);
thread_requests
.lock()
.expect("record request")
.push(request);
let _ = stream.write_all(response.as_bytes());
}
}
Err(error) if error.kind() == ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(10));
}
Err(error) if error.kind() == ErrorKind::Interrupted => {}
Err(error) => panic!("accept test request: {error}"),
}
}
});

Self {
authority,
requests,
stop,
handle: Some(handle),
}
}

fn endpoint_with_credentials(&self) -> String {
format!("http://accesskey:secretkey@{}", self.authority)
}

fn captured_requests(&self) -> Vec<CapturedRequest> {
self.requests.lock().expect("captured requests").clone()
}
}

impl Drop for TestServer {
fn drop(&mut self) {
self.stop.store(true, Ordering::SeqCst);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}

fn rc_binary() -> PathBuf {
if let Ok(path) = std::env::var("CARGO_BIN_EXE_rc") {
return PathBuf::from(path);
}

let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("cli crate has parent directory")
.parent()
.expect("workspace root exists")
.to_path_buf();

let debug_binary = workspace_root.join("target/debug/rc");
if debug_binary.exists() {
return debug_binary;
}

workspace_root.join("target/release/rc")
}

fn read_request(stream: &mut TcpStream) -> Option<CapturedRequest> {
stream
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("set stream read timeout");

let mut buffer = Vec::new();
let mut chunk = [0_u8; 1024];

loop {
match stream.read(&mut chunk) {
Ok(0) => break,
Ok(n) => {
buffer.extend_from_slice(&chunk[..n]);
if header_end_position(&buffer).is_some() {
break;
}
}
Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => {
break;
}
Err(_) => return None,
}
Comment thread
overtrue marked this conversation as resolved.
}

let header_end = header_end_position(&buffer)?;
let request = String::from_utf8_lossy(&buffer[..header_end]);
let mut lines = request.split("\r\n");
let request_line = lines.next()?;
let mut parts = request_line.split_whitespace();
let method = parts.next()?.to_string();
let path = parts.next()?.to_string();
let headers: Vec<(String, String)> = lines
.take_while(|line| !line.is_empty())
.filter_map(|line| line.split_once(':'))
.map(|(key, value)| (key.to_ascii_lowercase(), value.trim().to_string()))
.collect();

drain_request_body(stream, &mut buffer, header_end + 4, &headers)?;

Some(CapturedRequest {
method,
path,
headers,
})
}

fn header_end_position(buffer: &[u8]) -> Option<usize> {
buffer.windows(4).position(|window| window == b"\r\n\r\n")
}

fn drain_request_body(
stream: &mut TcpStream,
buffer: &mut Vec<u8>,
body_start: usize,
headers: &[(String, String)],
) -> Option<()> {
if let Some(content_length) =
header_value(headers, "content-length").and_then(|value| value.parse::<usize>().ok())
{
drain_content_length_body(stream, buffer, body_start, content_length)
} else if header_value(headers, "transfer-encoding")
.is_some_and(|value| value.eq_ignore_ascii_case("chunked"))
{
drain_chunked_body(stream, buffer, body_start)
} else {
Some(())
}
}

fn drain_content_length_body(
stream: &mut TcpStream,
buffer: &mut Vec<u8>,
body_start: usize,
content_length: usize,
) -> Option<()> {
let mut body_read = buffer.len().saturating_sub(body_start);
let mut chunk = [0_u8; 1024];

while body_read < content_length {
let n = stream.read(&mut chunk).ok()?;
if n == 0 {
break;
}
buffer.extend_from_slice(&chunk[..n]);
body_read += n;
}

Some(())
}

fn drain_chunked_body(
stream: &mut TcpStream,
buffer: &mut Vec<u8>,
body_start: usize,
) -> Option<()> {
let mut chunk = [0_u8; 1024];

while !buffer[body_start..]
.windows(5)
.any(|window| window == b"0\r\n\r\n")
{
let n = stream.read(&mut chunk).ok()?;
if n == 0 {
break;
}
buffer.extend_from_slice(&chunk[..n]);
}

Some(())
}

fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> {
headers
.iter()
.find(|(key, _)| key == name)
.map(|(_, value)| value.as_str())
}

fn response_for(request: &CapturedRequest) -> String {
match request.method.as_str() {
"GET" if request.path.contains("list-type=2") => xml_response(200, list_objects_body()),
"DELETE" => xml_response(204, ""),
"POST" if request.path.contains("delete") => xml_response(200, delete_objects_body()),
_ => xml_response(500, "<Error><Code>UnexpectedRequest</Code></Error>"),
}
}

fn xml_response(status: u16, body: &str) -> String {
let reason = match status {
200 => "OK",
204 => "No Content",
_ => "Internal Server Error",
};

format!(
"HTTP/1.1 {status} {reason}\r\nContent-Type: application/xml\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
)
}

fn list_objects_body() -> &'static str {
r#"<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>bucket</Name>
<Prefix>purge-prefix/</Prefix>
<KeyCount>2</KeyCount>
<MaxKeys>1000</MaxKeys>
<IsTruncated>false</IsTruncated>
<Contents>
<Key>purge-prefix/a.txt</Key>
<LastModified>2026-05-01T00:00:00.000Z</LastModified>
<ETag>&quot;etag-a&quot;</ETag>
<Size>1</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
<Contents>
<Key>purge-prefix/nested/b.txt</Key>
<LastModified>2026-05-01T00:00:00.000Z</LastModified>
<ETag>&quot;etag-b&quot;</ETag>
<Size>1</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>
</ListBucketResult>"#
}

fn delete_objects_body() -> &'static str {
r#"<?xml version="1.0" encoding="UTF-8"?>
<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Deleted><Key>purge-prefix/a.txt</Key></Deleted>
<Deleted><Key>purge-prefix/nested/b.txt</Key></Deleted>
</DeleteResult>"#
}

#[test]
fn rm_recursive_purge_deletes_each_key_with_force_header() {
let server = TestServer::start();
let config_dir = tempfile::tempdir().expect("create config dir");

let output = Command::new(rc_binary())
.args([
"--json",
"rm",
"--recursive",
"--purge",
"test/bucket/purge-prefix/",
])
.env("AWS_EC2_METADATA_DISABLED", "true")
.env("RC_CONFIG_DIR", config_dir.path())
.env("RC_HOST_test", server.endpoint_with_credentials())
.output()
.expect("run rc command");

assert!(
output.status.success(),
"stdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
);

let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8");
let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output");
assert_eq!(payload["status"], "success");
assert_eq!(payload["total"], 2);

let requests = server.captured_requests();
let list_requests: Vec<_> = requests
.iter()
.filter(|request| request.method == "GET" && request.path.contains("list-type=2"))
.collect();
assert_eq!(list_requests.len(), 1, "requests: {requests:#?}");
assert!(
list_requests[0].path.contains("prefix=purge-prefix%2F"),
"list request should include the recursive prefix: {requests:#?}"
);

let delete_requests: Vec<_> = requests
.iter()
.filter(|request| request.method == "DELETE")
.collect();
assert_eq!(delete_requests.len(), 2, "requests: {requests:#?}");

let delete_paths: Vec<_> = delete_requests
.iter()
.map(|request| {
request
.path
.split_once('?')
.map_or(request.path.as_str(), |(path, _)| path)
})
.collect();
assert!(
delete_paths.contains(&"/bucket/purge-prefix/a.txt"),
"requests: {requests:#?}"
);
assert!(
delete_paths.contains(&"/bucket/purge-prefix/nested/b.txt"),
"requests: {requests:#?}"
);

for request in delete_requests {
assert_eq!(
request.header("x-rustfs-force-delete"),
Some("true"),
"request should force-delete through RustFS: {request:#?}"
);
}

assert!(
!requests
.iter()
.any(|request| request.method == "POST" && request.path.contains("delete")),
"recursive purge should not use the batch delete path: {requests:#?}"
);
}
Loading