diff --git a/crates/cli/tests/rm_purge.rs b/crates/cli/tests/rm_purge.rs new file mode 100644 index 0000000..a729a7e --- /dev/null +++ b/crates/cli/tests/rm_purge.rs @@ -0,0 +1,368 @@ +#![cfg(not(windows))] + +use std::io::{ErrorKind, Read, Write}; +use std::net::{TcpListener, TcpStream}; +use std::path::PathBuf; +use std::process::Command; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +#[derive(Debug, Clone)] +struct CapturedRequest { + method: String, + path: String, + headers: Vec<(String, String)>, +} + +impl CapturedRequest { + fn header(&self, name: &str) -> Option<&str> { + header_value(&self.headers, name) + } +} + +struct TestServer { + authority: String, + requests: Arc>>, + stop: Arc, + handle: Option>, +} + +impl TestServer { + fn start() -> Self { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind local HTTP server"); + listener + .set_nonblocking(true) + .expect("set listener nonblocking"); + let authority = listener + .local_addr() + .expect("local HTTP server address") + .to_string(); + let requests = Arc::new(Mutex::new(Vec::new())); + let stop = Arc::new(AtomicBool::new(false)); + let thread_requests = Arc::clone(&requests); + let thread_stop = Arc::clone(&stop); + + let handle = thread::spawn(move || { + while !thread_stop.load(Ordering::SeqCst) { + match listener.accept() { + Ok((mut stream, _)) => { + if let Some(request) = read_request(&mut stream) { + let response = response_for(&request); + thread_requests + .lock() + .expect("record request") + .push(request); + let _ = stream.write_all(response.as_bytes()); + } + } + Err(error) if error.kind() == ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(error) if error.kind() == ErrorKind::Interrupted => {} + Err(error) => panic!("accept test request: {error}"), + } + } + }); + + Self { + authority, + requests, + stop, + handle: Some(handle), + } + } + + fn endpoint_with_credentials(&self) -> String { + format!("http://accesskey:secretkey@{}", self.authority) + } + + fn captured_requests(&self) -> Vec { + self.requests.lock().expect("captured requests").clone() + } +} + +impl Drop for TestServer { + fn drop(&mut self) { + self.stop.store(true, Ordering::SeqCst); + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +fn rc_binary() -> PathBuf { + if let Ok(path) = std::env::var("CARGO_BIN_EXE_rc") { + return PathBuf::from(path); + } + + let workspace_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .expect("cli crate has parent directory") + .parent() + .expect("workspace root exists") + .to_path_buf(); + + let debug_binary = workspace_root.join("target/debug/rc"); + if debug_binary.exists() { + return debug_binary; + } + + workspace_root.join("target/release/rc") +} + +fn read_request(stream: &mut TcpStream) -> Option { + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .expect("set stream read timeout"); + + let mut buffer = Vec::new(); + let mut chunk = [0_u8; 1024]; + + loop { + match stream.read(&mut chunk) { + Ok(0) => break, + Ok(n) => { + buffer.extend_from_slice(&chunk[..n]); + if header_end_position(&buffer).is_some() { + break; + } + } + Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => { + break; + } + Err(_) => return None, + } + } + + let header_end = header_end_position(&buffer)?; + let request = String::from_utf8_lossy(&buffer[..header_end]); + let mut lines = request.split("\r\n"); + let request_line = lines.next()?; + let mut parts = request_line.split_whitespace(); + let method = parts.next()?.to_string(); + let path = parts.next()?.to_string(); + let headers: Vec<(String, String)> = lines + .take_while(|line| !line.is_empty()) + .filter_map(|line| line.split_once(':')) + .map(|(key, value)| (key.to_ascii_lowercase(), value.trim().to_string())) + .collect(); + + drain_request_body(stream, &mut buffer, header_end + 4, &headers)?; + + Some(CapturedRequest { + method, + path, + headers, + }) +} + +fn header_end_position(buffer: &[u8]) -> Option { + buffer.windows(4).position(|window| window == b"\r\n\r\n") +} + +fn drain_request_body( + stream: &mut TcpStream, + buffer: &mut Vec, + body_start: usize, + headers: &[(String, String)], +) -> Option<()> { + if let Some(content_length) = + header_value(headers, "content-length").and_then(|value| value.parse::().ok()) + { + drain_content_length_body(stream, buffer, body_start, content_length) + } else if header_value(headers, "transfer-encoding") + .is_some_and(|value| value.eq_ignore_ascii_case("chunked")) + { + drain_chunked_body(stream, buffer, body_start) + } else { + Some(()) + } +} + +fn drain_content_length_body( + stream: &mut TcpStream, + buffer: &mut Vec, + body_start: usize, + content_length: usize, +) -> Option<()> { + let mut body_read = buffer.len().saturating_sub(body_start); + let mut chunk = [0_u8; 1024]; + + while body_read < content_length { + let n = stream.read(&mut chunk).ok()?; + if n == 0 { + break; + } + buffer.extend_from_slice(&chunk[..n]); + body_read += n; + } + + Some(()) +} + +fn drain_chunked_body( + stream: &mut TcpStream, + buffer: &mut Vec, + body_start: usize, +) -> Option<()> { + let mut chunk = [0_u8; 1024]; + + while !buffer[body_start..] + .windows(5) + .any(|window| window == b"0\r\n\r\n") + { + let n = stream.read(&mut chunk).ok()?; + if n == 0 { + break; + } + buffer.extend_from_slice(&chunk[..n]); + } + + Some(()) +} + +fn header_value<'a>(headers: &'a [(String, String)], name: &str) -> Option<&'a str> { + headers + .iter() + .find(|(key, _)| key == name) + .map(|(_, value)| value.as_str()) +} + +fn response_for(request: &CapturedRequest) -> String { + match request.method.as_str() { + "GET" if request.path.contains("list-type=2") => xml_response(200, list_objects_body()), + "DELETE" => xml_response(204, ""), + "POST" if request.path.contains("delete") => xml_response(200, delete_objects_body()), + _ => xml_response(500, "UnexpectedRequest"), + } +} + +fn xml_response(status: u16, body: &str) -> String { + let reason = match status { + 200 => "OK", + 204 => "No Content", + _ => "Internal Server Error", + }; + + format!( + "HTTP/1.1 {status} {reason}\r\nContent-Type: application/xml\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}", + body.len() + ) +} + +fn list_objects_body() -> &'static str { + r#" + + bucket + purge-prefix/ + 2 + 1000 + false + + purge-prefix/a.txt + 2026-05-01T00:00:00.000Z + "etag-a" + 1 + STANDARD + + + purge-prefix/nested/b.txt + 2026-05-01T00:00:00.000Z + "etag-b" + 1 + STANDARD + +"# +} + +fn delete_objects_body() -> &'static str { + r#" + + purge-prefix/a.txt + purge-prefix/nested/b.txt +"# +} + +#[test] +fn rm_recursive_purge_deletes_each_key_with_force_header() { + let server = TestServer::start(); + let config_dir = tempfile::tempdir().expect("create config dir"); + + let output = Command::new(rc_binary()) + .args([ + "--json", + "rm", + "--recursive", + "--purge", + "test/bucket/purge-prefix/", + ]) + .env("AWS_EC2_METADATA_DISABLED", "true") + .env("RC_CONFIG_DIR", config_dir.path()) + .env("RC_HOST_test", server.endpoint_with_credentials()) + .output() + .expect("run rc command"); + + assert!( + output.status.success(), + "stdout: {}\nstderr: {}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + + let stdout = String::from_utf8(output.stdout).expect("stdout should be UTF-8"); + let payload: serde_json::Value = serde_json::from_str(&stdout).expect("JSON output"); + assert_eq!(payload["status"], "success"); + assert_eq!(payload["total"], 2); + + let requests = server.captured_requests(); + let list_requests: Vec<_> = requests + .iter() + .filter(|request| request.method == "GET" && request.path.contains("list-type=2")) + .collect(); + assert_eq!(list_requests.len(), 1, "requests: {requests:#?}"); + assert!( + list_requests[0].path.contains("prefix=purge-prefix%2F"), + "list request should include the recursive prefix: {requests:#?}" + ); + + let delete_requests: Vec<_> = requests + .iter() + .filter(|request| request.method == "DELETE") + .collect(); + assert_eq!(delete_requests.len(), 2, "requests: {requests:#?}"); + + let delete_paths: Vec<_> = delete_requests + .iter() + .map(|request| { + request + .path + .split_once('?') + .map_or(request.path.as_str(), |(path, _)| path) + }) + .collect(); + assert!( + delete_paths.contains(&"/bucket/purge-prefix/a.txt"), + "requests: {requests:#?}" + ); + assert!( + delete_paths.contains(&"/bucket/purge-prefix/nested/b.txt"), + "requests: {requests:#?}" + ); + + for request in delete_requests { + assert_eq!( + request.header("x-rustfs-force-delete"), + Some("true"), + "request should force-delete through RustFS: {request:#?}" + ); + } + + assert!( + !requests + .iter() + .any(|request| request.method == "POST" && request.path.contains("delete")), + "recursive purge should not use the batch delete path: {requests:#?}" + ); +}