diff --git a/.mise.toml b/.mise.toml index c8ebc5d..71e6a90 100644 --- a/.mise.toml +++ b/.mise.toml @@ -94,6 +94,11 @@ run = "wasm-pack build . --target web -- -Z build-std=std,panic_abort" RUSTFLAGS = "-C target-cpu=mvp -C target-feature=+mutable-globals,+sign-ext,+nontrapping-fptoint" RUSTUP_TOOLCHAIN = "nightly" +[tasks.build-ws-data1-module] +description = "Build the data1 workflow WASM module" +dir = "services/ws-modules/data1" +run = "wasm-pack build . --target web" + [tasks.build-ws-har1-module] description = "Build the har1 workflow WASM module" dir = "services/ws-modules/har1" @@ -110,7 +115,13 @@ dir = "services/ws-modules/comm1" run = "wasm-pack build . --target web" [tasks.build-wasm] -depends = ["build-ws-comm1-module", "build-ws-face-detection-module", "build-ws-har1-module", "build-ws-wasm-agent"] +depends = [ + "build-ws-comm1-module", + "build-ws-data1-module", + "build-ws-face-detection-module", + "build-ws-har1-module", + "build-ws-wasm-agent", +] description = "Build all WebAssembly modules" [tasks.test-ws-wasm-agent-firefox] diff --git a/Cargo.toml b/Cargo.toml index c7da4bc..b3b0520 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ rust-version = "1.87.0" members = [ "libs/edge-toolkit", "services/ws-modules/comm1", + "services/ws-modules/data1", "services/ws-modules/face-detection", "services/ws-modules/har1", "services/ws-server", diff --git a/README.md b/README.md index 3428d7c..1422a31 100644 --- a/README.md +++ b/README.md @@ -37,14 +37,11 @@ mise run build-wasm mise run ws-server ``` -The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95. +Scan the QR-Code with a smart-phone camera and open the URL. -Find the IP address of your laptop in the local network, -which will normally be something like 192.168.1.x. +Select the module to run in the drop-down, then click "Run module" button. -Then on your phone, open Chrome and type in https://192.168.1.x:8433/ - -Select the module to run in the drop down, then click "Run module" button. +Note: The WASM build disables WebAssembly reference types, so it can still load on older browsers such as Chrome 95. ## Grant diff --git a/libs/edge-toolkit/src/ws.rs b/libs/edge-toolkit/src/ws.rs index e3e1973..bb26973 100644 --- a/libs/edge-toolkit/src/ws.rs +++ b/libs/edge-toolkit/src/ws.rs @@ -85,6 +85,13 @@ pub enum WsMessage { action: String, details: serde_json::Value, }, + StoreFile { + filename: String, + }, + FetchFile { + agent_id: String, + filename: String, + }, Response { message: String, }, diff --git a/services/ws-modules/data1/Cargo.toml b/services/ws-modules/data1/Cargo.toml new file mode 100644 index 0000000..f44a5e1 --- /dev/null +++ b/services/ws-modules/data1/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "et-ws-data1" +version = "0.1.0" +edition.workspace = true +license.workspace = true +repository.workspace = true + +[lib] +crate-type = ["cdylib", "rlib"] + +[dependencies] +edge-toolkit = { path = "../../../libs/edge-toolkit" } +et-ws-wasm-agent = { path = "../../ws-wasm-agent" } +js-sys = "0.3" +serde.workspace = true +serde_json.workspace = true +tracing.workspace = true +tracing-wasm = "0.2" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" +web-sys = { version = "0.3", features = [ + "Document", + "Element", + "HtmlTextAreaElement", + "Request", + "RequestInit", + "RequestMode", + "Response", + "Window", + "console", +] } + +[dev-dependencies] +wasm-bindgen-test = "0.3" diff --git a/services/ws-modules/data1/src/lib.rs b/services/ws-modules/data1/src/lib.rs new file mode 100644 index 0000000..c96086b --- /dev/null +++ b/services/ws-modules/data1/src/lib.rs @@ -0,0 +1,212 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use edge_toolkit::ws::WsMessage; +use et_ws_wasm_agent::{WsClient, WsClientConfig, append_to_textarea}; +use js_sys::{Promise, Reflect}; +use tracing::info; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::JsFuture; +use web_sys::{Request, RequestInit, RequestMode, Response}; + +#[wasm_bindgen(start)] +pub fn init() { + tracing_wasm::set_as_global_default(); + info!("data1 workflow module initialized"); +} + +#[wasm_bindgen] +pub async fn run() -> Result<(), JsValue> { + let msg = "data1: entered run()"; + log(msg)?; + set_module_status(msg)?; + + let ws_url = websocket_url()?; + let mut client = WsClient::new(WsClientConfig::new(ws_url)); + + let last_response = Rc::new(RefCell::new(None)); + let on_message = Closure::wrap(Box::new({ + let last_response = last_response.clone(); + move |value: JsValue| { + let Some(data) = value.as_string() else { + return; + }; + let Ok(message) = serde_json::from_str::(&data) else { + return; + }; + if let WsMessage::Response { message } = message { + *last_response.borrow_mut() = Some(message); + } + } + }) as Box); + client.set_on_message(on_message.as_ref().clone()); + + client.connect()?; + wait_for_connected(&client).await?; + let agent_id = wait_for_agent_id(&client).await?; + let msg = format!("data1: connected as {agent_id}"); + log(&msg)?; + set_module_status(&msg)?; + + let filename = "test_data.txt"; + let test_content = format!("Hello from data1 at {}!", js_sys::Date::new_0().to_iso_string()); + + // 1. Request Store URL + log("data1: requesting store URL")?; + client.send( + &serde_json::to_string(&WsMessage::StoreFile { + filename: filename.to_string(), + }) + .unwrap(), + )?; + let store_url = wait_for_response(&last_response, "PUT to ") + .await? + .replace("PUT to ", ""); + + // 2. Perform PUT + let msg = format!("data1: storing data to {store_url}"); + log(&msg)?; + set_module_status(&msg)?; + put_file(&store_url, &test_content).await?; + + // 3. Request Fetch URL + log("data1: requesting fetch URL")?; + client.send( + &serde_json::to_string(&WsMessage::FetchFile { + agent_id: agent_id.clone(), + filename: filename.to_string(), + }) + .unwrap(), + )?; + let fetch_url = wait_for_response(&last_response, "GET from ") + .await? + .replace("GET from ", ""); + + // 4. Perform GET and Verify + let msg = format!("data1: fetching data from {fetch_url}"); + log(&msg)?; + set_module_status(&msg)?; + let retrieved_content = get_file(&fetch_url).await?; + + if retrieved_content == test_content { + let msg = "data1: VERIFICATION SUCCESS - data matches!"; + log(msg)?; + set_module_status(msg)?; + } else { + let msg = format!( + "data1: VERIFICATION FAILURE - data mismatch!\nSent: {}\nGot: {}", + test_content, retrieved_content + ); + log(&msg)?; + set_module_status(&msg)?; + return Err(JsValue::from_str("Data mismatch")); + } + + sleep_ms(2000).await?; + client.disconnect(); + let msg = "data1: workflow complete"; + log(msg)?; + set_module_status(msg)?; + Ok(()) +} + +async fn put_file(url: &str, content: &str) -> Result<(), JsValue> { + let opts = RequestInit::new(); + opts.set_method("PUT"); + opts.set_mode(RequestMode::Cors); + opts.set_body(&JsValue::from_str(content)); + + let request = Request::new_with_str_and_init(url, &opts)?; + let window = web_sys::window().unwrap(); + let resp_value = JsFuture::from(window.fetch_with_request(&request)).await?; + let resp: Response = resp_value.dyn_into().unwrap(); + + if resp.status() == 200 { + Ok(()) + } else { + Err(JsValue::from_str(&format!("PUT failed with status {}", resp.status()))) + } +} + +async fn get_file(url: &str) -> Result { + let window = web_sys::window().unwrap(); + let resp_value = JsFuture::from(window.fetch_with_str(url)).await?; + let resp: Response = resp_value.dyn_into().unwrap(); + + if resp.status() != 200 { + return Err(JsValue::from_str(&format!("GET failed with status {}", resp.status()))); + } + + let text_promise = resp.text()?; + let text = JsFuture::from(text_promise).await?; + Ok(text.as_string().unwrap_or_default()) +} + +async fn wait_for_response(cell: &Rc>>, prefix: &str) -> Result { + for _ in 0..50 { + let val = cell.borrow().clone(); + if let Some(s) = val + && s.starts_with(prefix) + { + *cell.borrow_mut() = None; + return Ok(s); + } + sleep_ms(100).await?; + } + Err(JsValue::from_str("Timeout waiting for server response")) +} + +fn log(message: &str) -> Result<(), JsValue> { + let line = format!("[data1] {message}"); + web_sys::console::log_1(&JsValue::from_str(&line)); + Ok(()) +} + +fn set_module_status(message: &str) -> Result<(), JsValue> { + append_to_textarea("module-output", message) +} + +async fn wait_for_connected(client: &WsClient) -> Result<(), JsValue> { + for _ in 0..100 { + if client.get_state() == "connected" { + return Ok(()); + } + sleep_ms(100).await?; + } + Err(JsValue::from_str("Timed out waiting for websocket connection")) +} + +async fn wait_for_agent_id(client: &WsClient) -> Result { + for _ in 0..100 { + let agent_id = client.get_client_id(); + if !agent_id.is_empty() { + return Ok(agent_id); + } + sleep_ms(100).await?; + } + Err(JsValue::from_str("Timed out waiting for assigned agent_id")) +} + +async fn sleep_ms(duration_ms: i32) -> Result<(), JsValue> { + let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?; + let promise = Promise::new(&mut |resolve, _reject| { + let callback = Closure::once_into_js(move || { + let _ = resolve.call0(&JsValue::NULL); + }); + let _ = window.set_timeout_with_callback_and_timeout_and_arguments_0(callback.unchecked_ref(), duration_ms); + }); + JsFuture::from(promise).await.map(|_| ()) +} + +fn websocket_url() -> Result { + let window = web_sys::window().ok_or_else(|| JsValue::from_str("No window available"))?; + let location = Reflect::get(window.as_ref(), &JsValue::from_str("location"))?; + let protocol = Reflect::get(&location, &JsValue::from_str("protocol"))? + .as_string() + .unwrap(); + let host = Reflect::get(&location, &JsValue::from_str("host"))? + .as_string() + .unwrap(); + let ws_protocol = if protocol == "https:" { "wss:" } else { "ws:" }; + Ok(format!("{ws_protocol}//{host}/ws")) +} diff --git a/services/ws-server/.gitignore b/services/ws-server/.gitignore new file mode 100644 index 0000000..3142720 --- /dev/null +++ b/services/ws-server/.gitignore @@ -0,0 +1 @@ +registry.yaml diff --git a/services/ws-server/Cargo.toml b/services/ws-server/Cargo.toml index f8f5d03..af753c5 100644 --- a/services/ws-server/Cargo.toml +++ b/services/ws-server/Cargo.toml @@ -19,11 +19,16 @@ rcgen = "0.14" rustls = "0.23" serde.workspace = true serde_json.workspace = true +serde_yaml = "0.9" tokio = { version = "1", features = ["full"] } tracing.workspace = true tracing-subscriber = { version = "0.3", features = ["env-filter"] } #opentelemetry-actix-web = "0.10" chrono.workspace = true +clap = { version = "4.4", features = ["derive"] } +futures-util = "0.3" +local-ip-address = "0.6" +qr2term = "0.3" uuid.workspace = true [dependencies.actix-rt] diff --git a/services/ws-server/src/main.rs b/services/ws-server/src/main.rs index 1400fbf..0342ab7 100644 --- a/services/ws-server/src/main.rs +++ b/services/ws-server/src/main.rs @@ -42,7 +42,9 @@ struct ServerEnvelope { message: WsMessage, } -#[derive(Debug, Clone)] +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] struct PendingDirectMessage { message_id: String, from_agent_id: String, @@ -50,11 +52,13 @@ struct PendingDirectMessage { message: serde_json::Value, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] struct AgentRecord { state: AgentConnectionState, last_known_ip: Option, + #[serde(skip)] session: Option>, + #[serde(default)] pending_direct_messages: BTreeMap, } @@ -86,6 +90,27 @@ enum AckResult { } impl AgentRegistry { + fn save(&self, path: &Path) -> std::io::Result<()> { + let agents = self.agents.lock().expect("agent registry lock poisoned"); + let yaml = serde_yaml::to_string(&*agents).map_err(std::io::Error::other)?; + std::fs::write(path, yaml)?; + info!("Agent registry saved to {:?}", path); + Ok(()) + } + + fn load(path: &Path) -> std::io::Result { + if !path.exists() { + warn!("Registry file {:?} does not exist, starting with empty registry", path); + return Ok(Self::default()); + } + let yaml = std::fs::read_to_string(path)?; + let agents: BTreeMap = serde_yaml::from_str(&yaml).map_err(std::io::Error::other)?; + info!("Loaded {} agents from registry {:?}", agents.len(), path); + Ok(Self { + agents: Arc::new(Mutex::new(agents)), + }) + } + fn connect_agent( &self, requested_id: Option, @@ -639,6 +664,36 @@ impl StreamHandler> for WebSocketActor { details ); } + WsMessage::StoreFile { filename } => { + let Some(agent_id) = self.assigned_agent_id() else { + Self::send_invalid(ctx, None, "agent must connect before storing files"); + span.end(); + return; + }; + let url = format!("/storage/{}/{}", agent_id, filename); + info!("Agent {} requested storage URL for {}: {}", agent_id, filename, url); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("PUT to {}", url), + }, + ); + } + WsMessage::FetchFile { agent_id, filename } => { + let url = format!("/storage/{}/{}", agent_id, filename); + info!( + "Agent {} requested fetch URL for {}/{}", + self.current_agent_id(), + agent_id, + filename + ); + Self::send_json( + ctx, + &WsMessage::Response { + message: format!("GET from {}", url), + }, + ); + } WsMessage::ConnectAck { .. } | WsMessage::ListAgentsResponse { .. } | WsMessage::AgentMessage { .. } @@ -802,27 +857,58 @@ fn tls_config() -> std::io::Result { .map_err(|e| std::io::Error::other(format!("failed to configure TLS: {e}"))) } +use clap::Parser; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Path to agent registry YAML file + #[arg(short, long, default_value = "registry.yaml")] + agent_registry: PathBuf, +} + +use actix_web::middleware::Logger; + #[actix_web::main] async fn main() -> std::io::Result<()> { + let args = Args::parse(); let _provider = init_tracing(); tracing_subscriber::registry() - .with(tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "info,ws_server=debug".into())) + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "info,ws_server=debug,actix_web=info".into()), + ) .with(tracing_subscriber::fmt::layer()) .init(); let tls_config = tls_config()?; - info!("Starting WebSocket server on http://0.0.0.0:8080"); - info!("Starting WebSocket server on https://localhost:8443"); + let network_ip = local_ip_address::local_ip() + .map(|ip| ip.to_string()) + .unwrap_or_else(|_| "127.0.0.1".to_string()); + let https_url = format!("https://{}:8443", network_ip); + info!("Starting WebSocket server on http://{}:8080", network_ip); + info!("Starting WebSocket server on {}", https_url); + info!("Scan this QR code to open the browser interface:"); + if let Err(e) = qr2term::print_qr(&https_url) { + error!("Failed to generate QR code: {}", e); + } info!("Serving browser assets from {:?}", browser_static_dir()); info!("Serving wasm package from {:?}", wasm_pkg_dir()); info!("Serving wasm modules from {:?}", wasm_modules_dir()); info!("HTTPS uses an in-memory self-signed localhost certificate for development"); - let agent_registry = web::Data::new(AgentRegistry::default()); - HttpServer::new(move || { + let agent_registry = web::Data::new(AgentRegistry::load(&args.agent_registry)?); + let registry_clone = agent_registry.clone(); + let registry_path = args.agent_registry.clone(); + + let storage_dir = workspace_root().join("services/ws-server/storage"); + std::fs::create_dir_all(&storage_dir)?; + + let server = HttpServer::new(move || { App::new() + .wrap(Logger::default()) .app_data(agent_registry.clone()) .route("/", web::get().to(browser_index)) .route("/index.html", web::get().to(browser_index)) @@ -830,12 +916,71 @@ async fn main() -> std::io::Result<()> { .route("/health", web::get().to(health)) .route("/ws", web::get().to(ws_handler)) .route("/files/{filename}", web::get().to(file_handler)) + .route("/storage/{agent_id}/{filename}", web::put().to(agent_put_file)) + .service( + Files::new("/storage", &storage_dir) + .show_files_listing() + .use_etag(true) + .use_last_modified(true), + ) .service(Files::new("/modules", wasm_modules_dir()).prefer_utf8(true)) .service(Files::new("/pkg", wasm_pkg_dir()).prefer_utf8(true)) .service(Files::new("/static", browser_static_dir()).prefer_utf8(true)) }) .bind(("0.0.0.0", 8080))? .bind_rustls_0_23(("0.0.0.0", 8443), tls_config)? - .run() - .await + .run(); + + let handle = server.handle(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("Shutdown signal received, saving registry..."); + if let Err(e) = registry_clone.save(®istry_path) { + error!("Failed to save registry on shutdown: {}", e); + } + handle.stop(true).await; + }); + + server.await +} + +async fn agent_put_file( + req: HttpRequest, + mut payload: web::Payload, + registry: web::Data, +) -> Result { + let agent_id: String = req.match_info().query("agent_id").parse().unwrap(); + let filename: PathBuf = req + .match_info() + .query("filename") + .parse() + .map_err(|_| actix_web::error::ErrorBadRequest("invalid filename"))?; + + // Validate agent exists + { + let agents = registry.agents.lock().expect("lock poisoned"); + if !agents.contains_key(&agent_id) { + return Err(actix_web::error::ErrorNotFound("agent not found")); + } + } + + if filename.components().count() != 1 { + return Err(actix_web::error::ErrorBadRequest("invalid filename")); + } + + let storage_dir = workspace_root().join("services/ws-server/storage"); + let agent_dir = storage_dir.join(&agent_id); + std::fs::create_dir_all(&agent_dir)?; + + let path = agent_dir.join(&filename); + info!("Agent {} storing file: {:?}", agent_id, path); + + use futures_util::StreamExt; + let mut file = tokio::fs::File::create(path).await?; + while let Some(chunk) = payload.next().await { + let chunk = chunk?; + tokio::io::copy(&mut &chunk[..], &mut file).await?; + } + + Ok(HttpResponse::Ok().finish()) } diff --git a/services/ws-server/static/app.js b/services/ws-server/static/app.js index b1ff851..12625a8 100644 --- a/services/ws-server/static/app.js +++ b/services/ws-server/static/app.js @@ -87,6 +87,11 @@ const WORKFLOW_MODULES = { moduleUrl: "/modules/comm1/pkg/et_ws_comm1.js", wasmUrl: "/modules/comm1/pkg/et_ws_comm1_bg.wasm", }, + data1: { + label: "data1", + moduleUrl: "/modules/data1/pkg/et_ws_data1.js", + wasmUrl: "/modules/data1/pkg/et_ws_data1_bg.wasm", + }, }; const updateAgentCard = (status, agentId = currentAgentId) => { diff --git a/services/ws-server/static/index.html b/services/ws-server/static/index.html index a6765c7..fbfa4c1 100644 --- a/services/ws-server/static/index.html +++ b/services/ws-server/static/index.html @@ -123,9 +123,10 @@

WASM web agent

diff --git a/services/ws-server/storage/.gitignore b/services/ws-server/storage/.gitignore new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/services/ws-server/storage/.gitignore @@ -0,0 +1 @@ +* diff --git a/services/ws-wasm-agent/src/lib.rs b/services/ws-wasm-agent/src/lib.rs index e491704..d27c108 100644 --- a/services/ws-wasm-agent/src/lib.rs +++ b/services/ws-wasm-agent/src/lib.rs @@ -1608,6 +1608,9 @@ impl WsClient { WsMessage::Connect { .. } => { warn!("Unexpected connect message from server"); } + WsMessage::StoreFile { .. } | WsMessage::FetchFile { .. } => { + warn!("Unexpected file storage request from server"); + } } } // Notify callback if set