diff --git a/.github/workflows/lint_and_test.yml b/.github/workflows/lint_and_test.yml new file mode 100644 index 0000000..b93999d --- /dev/null +++ b/.github/workflows/lint_and_test.yml @@ -0,0 +1,53 @@ +name: CI + +on: [pull_request] + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Set up Rust toolchain + uses: hecrj/setup-rust-action@v2 + with: + rust-version: stable + + - name: Check out the code + uses: actions/checkout@v4 + + - name: Install Clippy + run: rustup component add clippy + + - name: Run Clippy + run: cargo clippy --all-targets --all-features -- -D warnings + + test: + needs: lint + runs-on: ubuntu-latest + strategy: + matrix: + test_name: [ + "tests::basic_test", + "tests::test_health_endpoint", + "tests::test_join_group", + "tests::test_refresh_empty_group", + "tests::test_refresh_group_with_file", + "tests::test_refresh_group_with_single_repo", + "tests::test_refresh_joined_group", + "tests::test_refresh_nonexistent_group", + "tests::test_replicate_group", + "tests::test_upload_list_delete" + ] + + steps: + - name: Set up Rust toolchain + uses: hecrj/setup-rust-action@v2 + with: + rust-version: stable + + - name: Check out the code + uses: actions/checkout@v4 + + - name: Run individual test + env: + RUST_MIN_STACK: 8388608 + run: cargo test --verbose -- ${{ matrix.test_name }} --test-threads=1 --nocapture \ No newline at end of file diff --git a/API.md b/API.md new file mode 100644 index 0000000..6c5b69d --- /dev/null +++ b/API.md @@ -0,0 +1,491 @@ +# Save-Rust API Documentation + +This document provides detailed information about the Save-Rust API endpoints, including request/response schemas and error handling. + +## Table of Contents +- [General Endpoints](#general-endpoints) +- [Groups Endpoints](#groups-endpoints) +- [Repositories Endpoints](#repositories-endpoints) +- [Media Endpoints](#media-endpoints) + +## General Endpoints + +### GET /status +Returns the server status and version information. + +Response: +```json +{ + "status": "running", + "version": "string" // Current version of the server +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Something went wrong: [detailed error message]" +} +``` + +### GET /health +Returns the server health status. + +Response: +```json +{ + "status": "OK" +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Something went wrong: [detailed error message]" +} +``` + +### POST /api/memberships +Joins a group using a membership URL. + +Request Body: +```json +{ + "group_url": "string" // URL containing group information +} +``` + +Response: +```json +{ + "status_message": "string" // Success or error message +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group URL: [detailed error message]" +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Failed to join group: [detailed error message]" +} +``` + +## Groups Endpoints + +Base path: `/api/groups` + +### GET / +Lists all groups. + +Response: +```json +{ + "groups": [ + { + "key": "string", // Base64 encoded group ID + "name": "string", // Optional group name + "created_at": "string" // ISO 8601 timestamp + } + ] +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Failed to list groups: [detailed error message]" +} +``` + +### POST / +Creates a new group. + +Request Body: +```json +{ + "name": "string" // Name for the new group +} +``` + +Response: +```json +{ + "key": "string", // Base64 encoded group ID + "name": "string", // Group name + "created_at": "string" // ISO 8601 timestamp +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group name: [detailed error message]" +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Failed to create group: [detailed error message]" +} +``` + +### POST /join_from_url +Joins a group using a URL. + +Request Body: +```json +{ + "group_url": "string" // URL containing group information +} +``` + +Response: +```json +{ + "key": "string", // Base64 encoded group ID + "name": "string", // Group name + "created_at": "string" // ISO 8601 timestamp +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group URL: [detailed error message]" +} +``` + +Error Response (500 Internal Server Error): +```json +{ + "status": "error", + "error": "Failed to join group: [detailed error message]" +} +``` + +### GET /{group_id} +Retrieves a specific group by its ID. + +Response: +```json +{ + "key": "string", // Base64 encoded group ID + "name": "string", // Group name + "created_at": "string" // ISO 8601 timestamp +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group not found: [detailed error message]" +} +``` + +### DELETE /{group_id} +Deletes a group by its ID. + +Response: +```json +{ + "status": "success" +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group not found: [detailed error message]" +} +``` + +### POST /{group_id}/refresh +Refreshes a group by its ID. + +Response: +```json +{ + "status": "success", + "repos": [ + { + "name": "string", // Repository name + "can_write": boolean, // Whether the user can write to this repo + "repo_hash": "string", // Hash of the repository + "refreshed_files": [ // List of files that were refreshed + "string" // File names + ], + "all_files": [ // List of all files in the repository + "string" // File names + ] + } + ] +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group not found: [detailed error message]" +} +``` + +## Repositories Endpoints + +Base path: `/api/groups/{group_id}/repos` + +### GET / +Lists all repositories within a group. + +Response: +```json +{ + "repos": [ + { + "key": "string", // Base64 encoded repository ID + "name": "string", // Repository name + "created_at": "string" // ISO 8601 timestamp + } + ] +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group not found: [detailed error message]" +} +``` + +### POST / +Creates a new repository within a group. + +Request Body: +```json +{ + "name": "string" // Name for the new repository +} +``` + +Response: +```json +{ + "key": "string", // Base64 encoded repository ID + "name": "string", // Repository name + "created_at": "string" // ISO 8601 timestamp +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID or repository name: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group not found: [detailed error message]" +} +``` + +### GET /{repo_id} +Retrieves a specific repository within a group. + +Response: +```json +{ + "key": "string", // Base64 encoded repository ID + "name": "string", // Repository name + "created_at": "string" // ISO 8601 timestamp +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID or repository ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group or repository not found: [detailed error message]" +} +``` + +## Media Endpoints + +Base path: `/api/groups/{group_id}/repos/{repo_id}/media` + +### GET / +Lists all files in a repository. + +Response: +```json +{ + "files": [ + { + "name": "string", // File name + "size": number, // File size in bytes + "created_at": "string" // ISO 8601 timestamp + } + ] +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID or repository ID: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group or repository not found: [detailed error message]" +} +``` + +### POST /{file_name} +Uploads a file to a repository. + +Request Body: Binary file content + +Response: +```json +{ + "name": "string", // File name + "updated_collection_hash": "string" // Hash of the updated collection +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID, repository ID, or file content: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group or repository not found: [detailed error message]" +} +``` + +Error Response (413 Payload Too Large): +```json +{ + "status": "error", + "error": "File too large: [detailed error message]" +} +``` + +### GET /{file_name} +Downloads a specific file from a repository. + +Response: Binary file content with appropriate Content-Type header + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID, repository ID, or file name: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group, repository, or file not found: [detailed error message]" +} +``` + +### DELETE /{file_name} +Deletes a specific file from a repository. + +Response: +```json +{ + "collection_hash": "string" // Hash of the updated collection after deletion +} +``` + +Error Response (400 Bad Request): +```json +{ + "status": "error", + "error": "Invalid group ID, repository ID, or file name: [detailed error message]" +} +``` + +Error Response (404 Not Found): +```json +{ + "status": "error", + "error": "Group, repository, or file not found: [detailed error message]" +} +``` \ No newline at end of file diff --git a/README.md b/README.md index e056ae6..206d26f 100644 --- a/README.md +++ b/README.md @@ -11,4 +11,39 @@ Bindings to the save-dweb-backend for the Save Android app. - Have `save-android` set up in the parent folder - Set up the `ANDROID_NDK_HOME` variable - `./build-android.sh` -- You can now recompile the android app. \ No newline at end of file +- You can now recompile the android app. + +# API Documentation + +The Save-Rust API provides HTTP endpoints for managing groups, repositories, and media files. For detailed API documentation including request/response schemas and error handling, please see [API.md](API.md). + +## Available Endpoints + +### General +* `GET /status` - Returns the server status and version. +* `GET /health` - Returns the server health status. +* `POST /api/memberships` - Joins a group. + +### Groups +Base path: `/api/groups` +* `GET /` - Lists all groups. +* `POST /` - Creates a new group. +* `POST /join_from_url` - Joins a group using a URL. +* `GET /{group_id}` - Retrieves a specific group by its ID. +* `DELETE /{group_id}` - Deletes a group by its ID. +* `POST /{group_id}/refresh` - Refreshes a group by its ID. + +### Repositories +Base path: `/api/groups/{group_id}/repos` +* `GET /` - Lists all repositories within a group. +* `POST /` - Creates a new repository within a group. +* `GET /{repo_id}` - Retrieves a specific repository within a group. + +### Media +Base path: `/api/groups/{group_id}/repos/{repo_id}/media` +* `GET /` - Lists all files in a repository. +* `POST /{file_name}` - Uploads a file to a repository. +* `GET /{file_name}` - Downloads a specific file from a repository. +* `DELETE /{file_name}` - Deletes a specific file from a repository. + +For detailed information about request/response formats, error handling, and examples, please refer to the [API Documentation](API.md). \ No newline at end of file diff --git a/src/groups.rs b/src/groups.rs index 3ce5310..3e0a516 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -1,14 +1,14 @@ use actix_web::{web, delete, get, post, Responder, HttpResponse}; -use save_dweb_backend::common::DHTEntity; use serde_json::json; use crate::error::AppResult; use crate::log_debug; -use crate::models::IntoSnowbirdGroupsWithNames; -use crate::models::{RequestName, SnowbirdGroup, RequestUrl}; +use crate::models::{IntoSnowbirdGroupsWithNames, RequestName, RequestUrl, SnowbirdGroup}; use crate::repos; -use crate::constants::TAG; -use crate::server::server::get_backend; +use crate::constants::{TAG}; + +use crate::server::get_backend; use crate::utils::create_veilid_cryptokey_from_base64; +use save_dweb_backend::common::DHTEntity; pub fn scope() -> actix_web::Scope { web::scope("/groups") @@ -160,7 +160,7 @@ async fn refresh_group(group_id: web::Path) -> AppResult "all_files": json!(Vec::::new()) // Initialize empty }); let mut refreshed_files_vec = Vec::new(); - let mut all_files_vec = Vec::new(); + let mut all_files_vec: Vec = Vec::new(); // Get current repo hash and collection info match repo.get_hash_from_dht().await { diff --git a/src/lib.rs b/src/lib.rs index 67ddc57..2485f13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,11 +28,10 @@ mod tests { use save_dweb_backend::{common::DHTEntity, constants::TEST_GROUP_NAME}; use serde::{Deserialize, Serialize}; use serde_json::json; - use server::server::{get_backend, init_backend, status, health, BACKEND}; + use server::{get_backend, init_backend, status, health, BACKEND}; use tmpdir::TmpDir; use base64_url::base64; use base64_url::base64::Engine; - use env_logger; use save_dweb_backend::backend::Backend; use veilid_core::VeilidUpdate; use serial_test::serial; @@ -52,16 +51,66 @@ mod tests { files: Vec, } - // Helper: Wait for public internet readiness + // Helper: Wait for public internet readiness with timeout and retries async fn wait_for_public_internet_ready(backend: &Backend) -> anyhow::Result<()> { let mut rx = backend.subscribe_updates().await.ok_or_else(|| anyhow::anyhow!("No update receiver"))?; - while let Ok(update) = rx.recv().await { - if let VeilidUpdate::Attachment(attachment_state) = update { - if attachment_state.public_internet_ready { - break; + + let timeout = if cfg!(test) { + Duration::from_secs(15) + } else { + Duration::from_secs(30) + }; + + log::info!("Waiting for public internet to be ready (timeout: {:?})", timeout); + + // Try up to 6 times with exponential backoff + let mut retry_count = 0; + let max_retries = 6; + + while retry_count < max_retries { + match tokio::time::timeout(timeout, async { + while let Ok(update) = rx.recv().await { + match &update { + VeilidUpdate::Attachment(attachment_state) => { + log::debug!("Veilid attachment state: {:?}", attachment_state); + if attachment_state.public_internet_ready { + log::info!("Public internet is ready!"); + return Ok(()); + } + } + _ => log::trace!("Received Veilid update: {:?}", update), + } + } + Err(anyhow::anyhow!("Update channel closed before network was ready")) + }).await { + Ok(result) => return result, + Err(_) => { + retry_count += 1; + if retry_count < max_retries { + let backoff = Duration::from_secs(2u64.pow(retry_count as u32)); + log::warn!("Timeout waiting for public internet (attempt {}/{})", retry_count, max_retries); + log::info!("Retrying in {:?}...", backoff); + tokio::time::sleep(backoff).await; + // Resubscribe to get a fresh update channel + rx = backend.subscribe_updates().await.ok_or_else(|| anyhow::anyhow!("No update receiver"))?; + } } } } + + Err(anyhow::anyhow!("Failed to establish public internet connection after {} attempts", max_retries)) + } + + // Helper function to properly clean up test resources + async fn cleanup_test_resources(backend: &Backend) -> Result<()> { + log::info!("Cleaning up test resources..."); + + // Stop the backend, which will also handle VeilidAPI shutdown + backend.stop().await?; + + // Add a small delay to ensure everything is cleaned up + tokio::time::sleep(Duration::from_millis(500)).await; + Ok(()) } @@ -253,7 +302,7 @@ mod tests { // Clean up: Stop the backend { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } Ok(()) @@ -286,7 +335,7 @@ mod tests { .await?; let backend2 = Backend::from_dependencies( &path.to_path_buf(), - veilid_api2.clone(), + veilid_api2, update_rx2, store2, ) @@ -335,10 +384,11 @@ mod tests { assert_eq!(resp.repos.len(), 1, "Should have 1 repo after joining"); - backend2.stop().await?; + // Clean up both backends using the helper function + cleanup_test_resources(&backend2).await?; { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } Ok(()) @@ -348,8 +398,9 @@ mod tests { #[serial] async fn test_replicate_group() -> Result<()> { // Initialize the app - let path = TmpDir::new("test_api_repo_file_operations").await?; - + let path = TmpDir::new("test_replicate_group").await?; + + // Create second backend (creator) first let store2 = iroh_blobs::store::fs::Store::load(path.to_path_buf().join("iroh2")).await?; let (veilid_api2, update_rx2) = save_dweb_backend::common::init_veilid( path.to_path_buf().join("test2").as_path(), @@ -358,78 +409,88 @@ mod tests { .await?; let backend2 = Backend::from_dependencies( &path.to_path_buf(), - veilid_api2.clone(), + veilid_api2, update_rx2, store2, ) .await .unwrap(); - + + // Initialize main backend (joiner) BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); } - + + // Create group and repo in backend2 (creator) let mut group = backend2.create_group().await?; - let join_url = group.get_url(); - group.set_name(TEST_GROUP_NAME).await?; - let repo = group.create_repo().await?; repo.set_name(TEST_GROUP_NAME).await?; - - // Step 3: Upload a file to the repository + + // Upload a file to the repository let file_name = "example.txt"; let file_content = b"Test content for file upload"; - - repo.upload(&file_name, file_content.to_vec()).await?; - + repo.upload(file_name, file_content.to_vec()).await?; + tokio::time::sleep(Duration::from_secs(2)).await; - + let app = test::init_service( App::new() .service(status) .service(web::scope("/api").service(groups::scope())), ) .await; - + + // Join the group using the main backend { let backend = get_backend().await?; backend.join_from_url(join_url.as_str()).await?; } - - let get_file_req = test::TestRequest::get() + + // Wait for replication to complete + tokio::time::sleep(Duration::from_secs(2)).await; + + // Test HTTP endpoints after replication + // 1. Verify group exists and has correct name + let groups_req = test::TestRequest::get().uri("/api/groups").to_request(); + let groups_resp: GroupsResponse = test::call_and_read_body_json(&app, groups_req).await; + assert_eq!(groups_resp.groups.len(), 1, "Should have one group after joining"); + assert_eq!(groups_resp.groups[0].name, Some(TEST_GROUP_NAME.to_string()), + "Group should have correct name"); + + // 2. Verify repo exists and has correct name + let repos_req = test::TestRequest::get() + .uri(&format!("/api/groups/{}/repos", group.id())) + .to_request(); + let repos_resp: ReposResponse = test::call_and_read_body_json(&app, repos_req).await; + assert_eq!(repos_resp.repos.len(), 1, "Should have one repo after joining"); + assert_eq!(repos_resp.repos[0].name, TEST_GROUP_NAME, "Repo should have correct name"); + + // 3. Verify file exists and has correct content + let file_req = test::TestRequest::get() .uri(&format!( "/api/groups/{}/repos/{}/media/{}", - group.id().to_string(), - repo.id().to_string(), - file_name + group.id(), repo.id(), file_name )) .to_request(); - let get_file_resp = test::call_service(&app, get_file_req).await; - assert!(get_file_resp.status().is_success(), "File download failed"); - - let got_file_data = test::read_body(get_file_resp).await; - assert_eq!( - got_file_data.to_vec().as_slice(), - file_content, - "Downloaded back file content" - ); - - // Clean up - backend2.stop().await?; + let file_resp = test::call_service(&app, file_req).await; + assert!(file_resp.status().is_success(), "File should be accessible after replication"); + let got_content = test::read_body(file_resp).await; + assert_eq!(got_content.to_vec(), file_content.to_vec(), + "File content should match after replication"); + + // Clean up both backends using the helper function + cleanup_test_resources(&backend2).await?; { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - // Add delay to allow tasks to complete - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api2.shutdown().await; - + Ok(()) - } + } #[actix_web::test] #[serial] @@ -441,11 +502,10 @@ mod tests { // Initialize the app with basic setup let path = TmpDir::new("test_refresh_nonexistent").await?; BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); - let veilid_api = { + { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); - backend.get_veilid_api().await.unwrap() - }; + } let app = test::init_service( App::new() @@ -466,11 +526,8 @@ mod tests { // Clean up { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - // Add delay to allow tasks to complete - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api.shutdown().await; Ok(()) } @@ -485,11 +542,10 @@ mod tests { // Initialize the app with basic setup let path = TmpDir::new("test_refresh_empty").await?; BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); - let veilid_api = { + { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); - backend.get_veilid_api().await.unwrap() - }; + } // Create an empty group let empty_group = { @@ -518,11 +574,8 @@ mod tests { // Clean up { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - // Add delay to allow tasks to complete - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api.shutdown().await; Ok(()) } @@ -539,14 +592,13 @@ mod tests { BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); // Start backend and wait for public internet readiness - let veilid_api = { + { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); log::info!("Waiting for public internet readiness..."); wait_for_public_internet_ready(&backend).await?; log::info!("Public internet is ready"); - backend.get_veilid_api().await.unwrap() - }; + } // Create a group with a repo and upload a dummy file let (group, repo, dummy_file_name, dummy_file_content) = { @@ -631,11 +683,8 @@ mod tests { log::info!("Cleaning up test resources..."); { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - // Add delay to allow tasks to complete - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api.shutdown().await; Ok(()) } @@ -650,11 +699,10 @@ mod tests { // Initialize the app with basic setup let path = TmpDir::new("test_refresh_with_file").await?; BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); - let veilid_api = { + { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); - backend.get_veilid_api().await.unwrap() - }; + } // Create a group with a repo and upload a file let (group, repo) = { @@ -711,11 +759,8 @@ mod tests { // Clean up { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - // Add delay to allow tasks to complete - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api.shutdown().await; Ok(()) } @@ -723,14 +768,14 @@ mod tests { #[actix_web::test] #[serial] async fn test_refresh_joined_group() -> Result<()> { - // Initialize logging + // Initialize logging let _ = env_logger::try_init(); log::info!("Testing refresh of joined group"); - // Initialize the app with basic setup - let path = TmpDir::new("test_refresh_joined").await?; + // Initialize the app + let path = TmpDir::new("test_refresh_joined_group").await?; - // Initialize backend2 (creator) first + // Create second backend (creator) first let store2 = iroh_blobs::store::fs::Store::load(path.to_path_buf().join("iroh2")).await?; let (veilid_api2, update_rx2) = save_dweb_backend::common::init_veilid( path.to_path_buf().join("test2").as_path(), @@ -739,130 +784,119 @@ mod tests { .await?; let backend2 = Backend::from_dependencies( &path.to_path_buf(), - veilid_api2.clone(), + veilid_api2, update_rx2, store2, ) .await .unwrap(); - // Create group and repo in backend2 (without an explicit start or wait_for_public_internet_ready) - let mut group = backend2.create_group().await?; - group.set_name(TEST_GROUP_NAME).await?; - let repo = group.create_repo().await?; - repo.set_name("Test Repo").await?; - - // Upload a file (using backend2) to ensure repo has a collection/hash - let file_name = "test.txt"; - let file_content = b"Test content for joined group"; - repo.upload(file_name, file_content.to_vec()).await?; - log::info!("Uploaded test file to creator's repo"); - - // Wait for DHT propagation (after upload, before global BACKEND is initialized) - tokio::time::sleep(Duration::from_secs(2)).await; - - // Initialize and start the global BACKEND (joiner) (with a wait_for_public_internet_ready) + // Initialize main backend (joiner) BACKEND.get_or_init(|| init_backend(path.to_path_buf().as_path())); { let backend = get_backend().await?; backend.start().await.expect("Backend failed to start"); - log::info!("Waiting for public internet readiness for global BACKEND..."); - wait_for_public_internet_ready(&backend).await?; - log::info!("Public internet is ready for global BACKEND"); } - // Join the group (using the global BACKEND) - { - let backend = get_backend().await?; - backend.join_from_url(group.get_url().as_str()).await?; - log::info!("Successfully joined group"); - } + // Create group and repo in backend2 (creator) + let mut group = backend2.create_group().await?; + let join_url = group.get_url(); + group.set_name(TEST_GROUP_NAME).await?; + let repo = group.create_repo().await?; + repo.set_name(TEST_GROUP_NAME).await?; + + // Upload a file to the repository + let file_name = "example.txt"; + let file_content = b"Test content for file upload"; + repo.upload(file_name, file_content.to_vec()).await?; - // Wait for replication (after joining, before refresh endpoint is called) tokio::time::sleep(Duration::from_secs(2)).await; - // Initialize app for API testing let app = test::init_service( App::new() .service(status) - .service(health) .service(web::scope("/api").service(groups::scope())), ) .await; - // Test refresh endpoint (after joining and waiting) - log::info!("Testing refresh endpoint for joined group"); + // Join the group using the main backend + { + let backend = get_backend().await?; + backend.join_from_url(join_url.as_str()).await?; + } + + // Wait for replication to complete + tokio::time::sleep(Duration::from_secs(2)).await; + + // Test first refresh - should fetch files from network let refresh_req = test::TestRequest::post() .uri(&format!("/api/groups/{}/refresh", group.id())) .to_request(); let refresh_resp = test::call_service(&app, refresh_req).await; - // Verify response status and content - assert!(refresh_resp.status().is_success(), "Refresh should succeed"); + assert!(refresh_resp.status().is_success(), "First refresh should succeed"); + let refresh_data: serde_json::Value = test::read_body_json(refresh_resp).await; - log::info!("Refresh response: {:?}", refresh_data); - assert_eq!(refresh_data["status"], "success", "Response should indicate success"); + assert_eq!(refresh_data["status"], "success", "First refresh status should be success"); + let repos = refresh_data["repos"].as_array().expect("repos should be an array"); - assert_eq!(repos.len(), 1, "Should have one repo"); + assert_eq!(repos.len(), 1, "Should have one repo after joining"); + let repo_data = &repos[0]; - assert!(repo_data["repo_hash"].is_string(), "repo should have a hash"); - assert_eq!(repo_data["name"], "Test Repo", "repo name should match"); + assert_eq!(repo_data["name"], TEST_GROUP_NAME, "Repo should have correct name"); - // Verify files from the FIRST refresh - let refreshed_files_first = repo_data["refreshed_files"].as_array() - .expect("refreshed_files should be an array for first refresh"); - assert_eq!(refreshed_files_first.len(), 1, "One file should be refreshed on initial sync"); - assert_eq!(refreshed_files_first[0].as_str().unwrap(), file_name, "The correct file should be in refreshed_files on initial sync"); + // First refresh should have refreshed files + let refreshed_files = repo_data["refreshed_files"].as_array() + .expect("refreshed_files should be an array"); + assert_eq!(refreshed_files.len(), 1, "Should have refreshed 1 file on first refresh"); + assert_eq!(refreshed_files[0].as_str().unwrap(), file_name, + "Should have refreshed the correct file"); - let all_files_first = repo_data["all_files"].as_array().expect("all_files should be an array for first refresh"); - assert_eq!(all_files_first.len(), 1, "Should have one file in all_files on first refresh"); - assert_eq!(all_files_first[0].as_str().unwrap(), file_name, "all_files should contain the uploaded file on first refresh"); + let all_files = repo_data["all_files"].as_array().expect("all_files should be an array"); + assert_eq!(all_files.len(), 1, "Should have one file in all_files"); + assert_eq!(all_files[0].as_str().unwrap(), file_name, + "all_files should contain the uploaded file"); - // Verify file is accessible (after first refresh) - let get_file_req_first = test::TestRequest::get() + // Verify file is accessible after refresh + let get_file_req = test::TestRequest::get() .uri(&format!( "/api/groups/{}/repos/{}/media/{}", group.id(), repo.id(), file_name )) .to_request(); - let get_file_resp_first = test::call_service(&app, get_file_req_first).await; - assert!(get_file_resp_first.status().is_success(), "File should be accessible after first refresh"); - let got_content_first = test::read_body(get_file_resp_first).await; - assert_eq!(got_content_first.to_vec(), file_content.to_vec(), "File content should match after first refresh"); - - // ---- SECOND REFRESH ---- - log::info!("Testing second refresh endpoint for joined group (should be no-op)"); - let refresh_req_second = test::TestRequest::post() + let get_file_resp = test::call_service(&app, get_file_req).await; + assert!(get_file_resp.status().is_success(), "File should be accessible after refresh"); + let got_content = test::read_body(get_file_resp).await; + assert_eq!(got_content.to_vec(), file_content.to_vec(), + "File content should match after refresh"); + + // Test second refresh - should be no-op since all files are present + let refresh_req2 = test::TestRequest::post() .uri(&format!("/api/groups/{}/refresh", group.id())) .to_request(); - let refresh_resp_second = test::call_service(&app, refresh_req_second).await; - assert!(refresh_resp_second.status().is_success(), "Second refresh should succeed"); - let refresh_data_second: serde_json::Value = test::read_body_json(refresh_resp_second).await; - assert_eq!(refresh_data_second["status"], "success", "Second refresh response should indicate success"); - let repos_second = refresh_data_second["repos"].as_array().expect("repos should be an array for second refresh"); - assert_eq!(repos_second.len(), 1, "Should have one repo in second refresh"); - let repo_data_second = &repos_second[0]; - - let refreshed_files_second = repo_data_second["refreshed_files"].as_array() - .expect("refreshed_files should be an array for second refresh"); - assert!(refreshed_files_second.is_empty(), "No files should be refreshed on second sync as all are present"); - - let all_files_second = repo_data_second["all_files"].as_array().expect("all_files should be an array for second refresh"); - assert_eq!(all_files_second.len(), 1, "Should still have one file in all_files on second refresh"); - assert_eq!(all_files_second[0].as_str().unwrap(), file_name, "all_files should still contain the uploaded file on second refresh"); - - // Clean up (stop backend2, stop global BACKEND, shutdown veilid_api2) - log::info!("Cleaning up test resources..."); - backend2.stop().await?; + let refresh_resp2 = test::call_service(&app, refresh_req2).await; + assert!(refresh_resp2.status().is_success(), "Second refresh should succeed"); + + let refresh_data2: serde_json::Value = test::read_body_json(refresh_resp2).await; + assert_eq!(refresh_data2["status"], "success", "Second refresh status should be success"); + + let repos2 = refresh_data2["repos"].as_array().expect("repos should be an array"); + assert_eq!(repos2.len(), 1, "Should still have one repo"); + + let repo_data2 = &repos2[0]; + let refreshed_files2 = repo_data2["refreshed_files"].as_array() + .expect("refreshed_files should be an array"); + assert!(refreshed_files2.is_empty(), + "No files should be refreshed on second call since all are present"); + + // Clean up both backends using the helper function + cleanup_test_resources(&backend2).await?; { let backend = get_backend().await?; - backend.stop().await.expect("Backend failed to stop"); + cleanup_test_resources(&backend).await?; } - tokio::time::sleep(Duration::from_secs(2)).await; - veilid_api2.shutdown().await; Ok(()) } - #[actix_web::test] #[serial] async fn test_health_endpoint() -> Result<()> { diff --git a/src/logging.rs b/src/logging.rs index 4db53e8..d8c3025 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -38,7 +38,7 @@ pub const LOG_LEVEL_ERROR: i32 = 6; #[macro_export] macro_rules! android_log_print { ($level:expr, $tag:expr, $($arg:tt)*) => { - crate::logging::android_log($level, $tag, &format!("[{}:{}] {}", file!(), line!(), format_args!($($arg)*))) + $crate::logging::android_log($level, $tag, &format!("[{}:{}] {}", file!(), line!(), format_args!($($arg)*))) } } diff --git a/src/media.rs b/src/media.rs index 8355f33..e9bed45 100644 --- a/src/media.rs +++ b/src/media.rs @@ -1,7 +1,7 @@ use crate::constants::TAG; use crate::error::{AppError, AppResult}; use crate::models::{GroupRepoMediaPath, GroupRepoPath}; -use crate::server::server::get_backend; +use crate::server::get_backend; use crate::utils::create_veilid_cryptokey_from_base64; use crate::log_info; use actix_web::{delete, get, post, web, HttpResponse, Responder, Scope, http::header, error::BlockingError}; @@ -47,12 +47,12 @@ async fn list_files(path: web::Path) -> AppResult let repo_id = &path_params.repo_id; // Fetch the backend and group - let crypto_key = create_veilid_cryptokey_from_base64(&group_id)?; + let crypto_key = create_veilid_cryptokey_from_base64(group_id)?; let backend = get_backend().await?; let group = backend.get_group(&crypto_key).await?; // Fetch the repo - let repo_crypto_key = create_veilid_cryptokey_from_base64(&repo_id)?; + let repo_crypto_key = create_veilid_cryptokey_from_base64(repo_id)?; let repo = group.get_repo(&repo_crypto_key).await?; let hash = repo.get_hash_from_dht().await?; @@ -87,12 +87,12 @@ async fn download_file(path: web::Path) -> AppResult) -> AppResult) -> AppResult Scope { web::scope("/repos") @@ -35,7 +35,7 @@ async fn list_repos(path: web::Path) -> AppResult { log_debug!(TAG, "group_id = {}", group_id); // Fetch the backend and the group - let crypto_key = create_veilid_cryptokey_from_base64(&group_id)?; + let crypto_key = create_veilid_cryptokey_from_base64(group_id)?; let backend = get_backend().await?; let group = backend.get_group(&crypto_key).await?; log_debug!(TAG, "got group"); @@ -53,12 +53,12 @@ async fn get_repo(path: web::Path) -> AppResult { let repo_id = &path_params.repo_id; // Fetch the backend and the group - let crypto_key = create_veilid_cryptokey_from_base64(&group_id)?; + let crypto_key = create_veilid_cryptokey_from_base64(group_id)?; let backend = get_backend().await?; let group = backend.get_group(&crypto_key).await?; // Fetch the repo from the group - let repo_crypto_key = create_veilid_cryptokey_from_base64(&repo_id)?; + let repo_crypto_key = create_veilid_cryptokey_from_base64(repo_id)?; let repo = group.get_repo(&repo_crypto_key).await?; // Now, convert the owned Repo into SnowbirdRepo diff --git a/src/server.rs b/src/server.rs index 6856557..14981d4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,181 +1,179 @@ #![allow(unused)] -pub mod server { - use crate::constants::{self, TAG, VERSION}; - use crate::error::{AppError, AppResult}; - use crate::groups; - use crate::logging::android_log; - use crate::repos; - use crate::{log_debug, log_error, log_info}; - use actix_web::{delete, get, patch, post, put}; - use actix_web::{web, App, Error as ActixError, HttpResponse, HttpServer, Responder}; - use anyhow::{anyhow, Context, Result}; - use base64_url; - use futures::{future, lock}; - use num_cpus; - use once_cell::sync::OnceCell; - use save_dweb_backend::backend::Backend; - use serde::{Deserialize, Serialize}; - use serde_json::json; - use std::cmp; - use std::fs; - use std::net::Ipv4Addr; - use std::time::{Duration, Instant}; - use std::path::Path; - use std::sync::Arc; - use std::{env, panic}; - use thiserror::Error; - use tokio::sync::Mutex as TokioMutex; - use veilid_core::{ - vld0_generate_keypair, CryptoKey, TypedKey, VeilidUpdate, CRYPTO_KIND_VLD0, - VALID_CRYPTO_KINDS, - }; - use crate::actix_route_dumper::RouteDumper; - use crate::models::SnowbirdGroup; - - #[derive(Error, Debug)] - pub enum BackendError { - #[error("Backend not initialized")] - NotInitialized, - - #[error("Failed to initialize backend: {0}")] - InitializationError(#[from] std::io::Error), - } - - pub static BACKEND: OnceCell>> = OnceCell::new(); - - pub async fn get_backend<'a>( - ) -> Result + 'a, anyhow::Error> { - match BACKEND.get() { - Some(backend) => Ok(backend.lock().await), - None => Err(anyhow!("Backend not initialized")), - } - } +use crate::constants::{self, TAG, VERSION}; +use crate::error::{AppError, AppResult}; +use crate::groups; +use crate::logging::android_log; +use crate::repos; +use crate::{log_debug, log_error, log_info}; +use actix_web::{delete, get, patch, post, put}; +use actix_web::{web, App, Error as ActixError, HttpResponse, HttpServer, Responder}; +use anyhow::{anyhow, Context, Result}; +use base64_url; +use futures::{future, lock}; +use num_cpus; +use once_cell::sync::OnceCell; +use save_dweb_backend::backend::Backend; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use std::cmp; +use std::fs; +use std::net::Ipv4Addr; +use std::time::{Duration, Instant}; +use std::path::Path; +use std::sync::Arc; +use std::{env, panic}; +use thiserror::Error; +use tokio::sync::Mutex as TokioMutex; +use veilid_core::{ + vld0_generate_keypair, CryptoKey, TypedKey, VeilidUpdate, CRYPTO_KIND_VLD0, + VALID_CRYPTO_KINDS, +}; +use crate::actix_route_dumper::RouteDumper; +use crate::models::SnowbirdGroup; + +#[derive(Error, Debug)] +pub enum BackendError { + #[error("Backend not initialized")] + NotInitialized, + + #[error("Failed to initialize backend: {0}")] + InitializationError(#[from] std::io::Error), +} - pub fn init_backend(backend_path: &Path) -> Arc> { - Arc::new(TokioMutex::new( - Backend::new(backend_path).expect("Failed to create Backend."), - )) - } +pub static BACKEND: OnceCell>> = OnceCell::new(); - #[get("/status")] - async fn status() -> impl Responder { - HttpResponse::Ok().json(serde_json::json!({ - "status": "running", - "version": *VERSION - })) +pub async fn get_backend<'a>( +) -> Result + 'a, anyhow::Error> { + match BACKEND.get() { + Some(backend) => Ok(backend.lock().await), + None => Err(anyhow!("Backend not initialized")), } +} - #[get("/health")] - async fn health() -> impl Responder { - HttpResponse::Ok().json(serde_json::json!({ - "status": "OK" - })) - } +pub fn init_backend(backend_path: &Path) -> Arc> { + Arc::new(TokioMutex::new( + Backend::new(backend_path).expect("Failed to create Backend."), + )) +} - #[derive(Deserialize)] - struct JoinGroupRequest { - uri: String - } +#[get("/status")] +async fn status() -> impl Responder { + HttpResponse::Ok().json(serde_json::json!({ + "status": "running", + "version": *VERSION + })) +} - #[post("memberships")] - async fn join_group(body: web::Json) -> AppResult { - let join_request_data = body.into_inner(); - let backend = get_backend().await?; - let boxed_group = backend.join_from_url(&join_request_data.uri).await?; - let snowbird_group: SnowbirdGroup = boxed_group.as_ref().into(); +#[get("/health")] +async fn health() -> impl Responder { + HttpResponse::Ok().json(serde_json::json!({ + "status": "OK" + })) +} - Ok(HttpResponse::Ok().json(json!({ "group" : snowbird_group }))) - } +#[derive(Deserialize)] +struct JoinGroupRequest { + uri: String +} - fn actix_log(message: &str) { - log_debug!(TAG, "Actix log: {}", message); - } +#[post("memberships")] +async fn join_group(body: web::Json) -> AppResult { + let join_request_data = body.into_inner(); + let backend = get_backend().await?; + let boxed_group = backend.join_from_url(&join_request_data.uri).await?; + let snowbird_group: SnowbirdGroup = boxed_group.as_ref().into(); - fn log_perf(message: &str, duration: Duration) { - let total_ms = duration.as_millis(); - let rounded_tenths = (total_ms as f64 / 100.0).round() / 10.0; - log_info!(TAG, "{} after {:.1} s", message, rounded_tenths); - } + Ok(HttpResponse::Ok().json(json!({ "group" : snowbird_group }))) +} - fn get_optimal_worker_count() -> usize { - let cpu_count = num_cpus::get(); - //let worker_count = cmp::max(1, cmp::min(cpu_count / 2, 4)); - - log_debug!(TAG, "Detected {} CPUs", cpu_count); +fn actix_log(message: &str) { + log_debug!(TAG, "Actix log: {}", message); +} - // This whole thing was an attempt at optimization, but since - // we're only ever handling one request at a time let's keep - // things lightweight for now. - 1 - } +fn log_perf(message: &str, duration: Duration) { + let total_ms = duration.as_millis(); + let rounded_tenths = (total_ms as f64 / 100.0).round() / 10.0; + log_info!(TAG, "{} after {:.1} s", message, rounded_tenths); +} - pub async fn start(backend_base_directory: &str, server_socket_path: &str) -> anyhow::Result<()> { - log_debug!(TAG, "start_server: Using socket path: {:?}", server_socket_path); +fn get_optimal_worker_count() -> usize { + let cpu_count = num_cpus::get(); + //let worker_count = cmp::max(1, cmp::min(cpu_count / 2, 4)); + + log_debug!(TAG, "Detected {} CPUs", cpu_count); - let worker_count = get_optimal_worker_count(); + // This whole thing was an attempt at optimization, but since + // we're only ever handling one request at a time let's keep + // things lightweight for now. + 1 +} - let start_instant = Instant::now(); - log_info!(TAG, "Starting server initialization..."); +pub async fn start(backend_base_directory: &str, server_socket_path: &str) -> anyhow::Result<()> { + log_debug!(TAG, "start_server: Using socket path: {:?}", server_socket_path); - let lan_address = Ipv4Addr::UNSPECIFIED; // 0.0.0.0 - let lan_port = 8080; + let worker_count = get_optimal_worker_count(); - panic::set_hook(Box::new(|panic_info| { - log_error!(TAG, "Panic occurred: {:?}", panic_info); - })); + let start_instant = Instant::now(); + log_info!(TAG, "Starting server initialization..."); - if env::var("HOME").is_err() { - env::set_var("HOME", backend_base_directory); - } + let lan_address = Ipv4Addr::UNSPECIFIED; // 0.0.0.0 + let lan_port = 8080; - let backend_path = Path::new(backend_base_directory); + panic::set_hook(Box::new(|panic_info| { + log_error!(TAG, "Panic occurred: {:?}", panic_info); + })); - BACKEND.get_or_init(|| init_backend(backend_path)); + if env::var("HOME").is_err() { + env::set_var("HOME", backend_base_directory); + } - { - let mut backend = get_backend().await?; + let backend_path = Path::new(backend_base_directory); - backend.start().await.context("Backend failed to start"); - } + BACKEND.get_or_init(|| init_backend(backend_path)); - log_perf("Backend started", start_instant.elapsed()); + { + let mut backend = get_backend().await?; - let web_server = HttpServer::new(move || { - let app_start = Instant::now(); - let app = App::new() - .wrap(RouteDumper::new(actix_log)) - .service(status) - .service(health) - .service( - web::scope("/api") - .service(join_group) - .service(groups::scope()) - ); - log_perf("Web server app created", app_start.elapsed()); - app - }) - .bind_uds(server_socket_path)? - .bind((lan_address, lan_port))? - .disable_signals() - .workers(worker_count); + backend.start().await.context("Backend failed to start"); + } - log_perf("Web server initialized", start_instant.elapsed()); - log_info!(TAG, "Starting web server..."); + log_perf("Backend started", start_instant.elapsed()); + + let web_server = HttpServer::new(move || { + let app_start = Instant::now(); + let app = App::new() + .wrap(RouteDumper::new(actix_log)) + .service(status) + .service(health) + .service( + web::scope("/api") + .service(join_group) + .service(groups::scope()) + ); + log_perf("Web server app created", app_start.elapsed()); + app + }) + .bind_uds(server_socket_path)? + .bind((lan_address, lan_port))? + .disable_signals() + .workers(worker_count); + + log_perf("Web server initialized", start_instant.elapsed()); + log_info!(TAG, "Starting web server..."); - let server_future = web_server.run(); - log_perf("Web server started", start_instant.elapsed()); - - server_future.await.context("Failed to start server") - } + let server_future = web_server.run(); + log_perf("Web server started", start_instant.elapsed()); - pub async fn stop() -> anyhow::Result<()> { - let mut backend = get_backend().await?; + server_future.await.context("Failed to start server") +} - match backend.stop().await { - Ok(_) => log_debug!(TAG, "Backend shut down successfully."), - Err(e) => log_error!(TAG, "Failed to shut down backend: {:?}", e), - } +pub async fn stop() -> anyhow::Result<()> { + let mut backend = get_backend().await?; - Ok(()) + match backend.stop().await { + Ok(_) => log_debug!(TAG, "Backend shut down successfully."), + Err(e) => log_error!(TAG, "Failed to shut down backend: {:?}", e), } + + Ok(()) }