diff --git a/kaas-example/README.md b/kaas-example/README.md index 7d5b882..437e241 100644 --- a/kaas-example/README.md +++ b/kaas-example/README.md @@ -21,8 +21,19 @@ This example deploys a virtual IoT device management system as managed K8s conta ## Deployment -**Deploy via KaaS:** -```sh +1. **Build Docker images:** Clone the repository on the device and run the following commands +```bash +# Build Rust application +cd dummy-device-app +docker build -t k8s-at-the-edge-example-dummy-device-app:latest . + +# Build React application +cd ../web-portal +docker build -t web-portal:latest . +``` + +2. **Deploy to Kubernetes:** +```bash cd k8s # Render K8s definitions for your node @@ -70,3 +81,15 @@ docker compose up -d 3. Pull up the web-portal on local machine at http://localhost:3000 ![web-portal](assets/device-mgmt-portal.png) + +3. **Deploy via docker compose** + +```sh +docker compose up -d +``` + +If local port forwarding then run on your local machine + +```sh +ssh -L 8085:localhost:8085 -L 3000:localhost:3000 +``` \ No newline at end of file diff --git a/kaas-example/docker-compose.yml b/kaas-example/docker-compose.yml index 7471120..1264e71 100644 --- a/kaas-example/docker-compose.yml +++ b/kaas-example/docker-compose.yml @@ -16,6 +16,11 @@ services: interval: 10s timeout: 5s retries: 5 + logging: + driver: "json-file" + options: + max-size: "50m" + max-file: "2" dummy-device-app: build: @@ -23,13 +28,21 @@ services: dockerfile: Dockerfile environment: DATABASE_URL: postgres://postgres:password@postgres:5432/device_db + RUST_LOG: debug PORT: 3000 + volumes: + - /tmp:/tmp ports: - "3000:3000" depends_on: postgres: condition: service_healthy restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "50m" + max-file: "2" web-portal: build: @@ -38,10 +51,15 @@ services: environment: REACT_APP_API_URL: http://localhost:3000 ports: - - "8080:80" + - "8085:80" depends_on: - dummy-device-app restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "50m" + max-file: "2" nginx: image: nginx:alpine @@ -53,6 +71,11 @@ services: - web-portal - dummy-device-app restart: unless-stopped + logging: + driver: "json-file" + options: + max-size: "50m" + max-file: "2" volumes: postgres_data: \ No newline at end of file diff --git a/kaas-example/dummy-device-app/.env b/kaas-example/dummy-device-app/.env index 6c4ae02..970b95d 100644 --- a/kaas-example/dummy-device-app/.env +++ b/kaas-example/dummy-device-app/.env @@ -1,3 +1,3 @@ DATABASE_URL=postgres://postgres:password@localhost:5432/device_db -RUST_LOG=info +RUST_LOG=debug PORT=3000 diff --git a/kaas-example/dummy-device-app/Cargo.toml b/kaas-example/dummy-device-app/Cargo.toml index 73e7067..32b377b 100644 --- a/kaas-example/dummy-device-app/Cargo.toml +++ b/kaas-example/dummy-device-app/Cargo.toml @@ -15,4 +15,7 @@ uuid = { version = "1.0", features = ["v4", "serde"] } tracing = "0.1" tracing-subscriber = "0.3" anyhow = "1.0" -thiserror = "1.0" \ No newline at end of file +thiserror = "1.0" +tokio-tungstenite = "0.20" +futures-util = "0.3" +base64 = "0.21" \ No newline at end of file diff --git a/kaas-example/dummy-device-app/src/main.rs b/kaas-example/dummy-device-app/src/main.rs index cceaa61..ad9904b 100644 --- a/kaas-example/dummy-device-app/src/main.rs +++ b/kaas-example/dummy-device-app/src/main.rs @@ -2,7 +2,7 @@ use axum::{ extract::{Path, State}, http::StatusCode, response::Json, - routing::{get, post, put}, + routing::{get, post, put, delete}, Router, http::Request, middleware::{self, Next}, @@ -15,6 +15,22 @@ use std::sync::Arc; use tower_http::cors::{Any, CorsLayer}; use tracing::{info, warn, debug, error}; use uuid::Uuid; +// WebSocket imports for future Protocol Translator implementation +use tokio_tungstenite::{client_async, tungstenite::Message}; +use futures_util::{SinkExt, StreamExt}; +use tokio::net::UnixStream; +use std::collections::HashMap; +use std::sync::Mutex; +use serde_json::json; +use tokio_tungstenite::tungstenite::handshake::client::generate_key; +use base64::{Engine as _, engine::general_purpose}; +use tokio::sync::mpsc; + +// LwM2M/Edge Core operation bit flags +const OP_READ: u8 = 0x01; +const OP_WRITE: u8 = 0x02; +const OP_EXECUTE: u8 = 0x04; +const OP_DELETE: u8 = 0x08; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum DeviceType { @@ -77,8 +93,424 @@ pub struct ApiResponse { pub error: Option, } +// LwM2M Object and Resource IDs based on IPSO Alliance specifications +#[derive(Debug, Clone, Copy)] +pub enum LwM2MObjectType { + DigitalOutput = 3201, + TemperatureSensor = 3303, + HumiditySensor = 3304, + SetPoint = 3308, + LightControl = 3311, +} + +#[derive(Debug, Clone, Copy)] +pub enum LwM2MResource { + DigitalInputState = 5500, + SensorValue = 5700, + SensorUnits = 5701, + OnOffValue = 5850, + SetPointValue = 5900, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LwM2MResourceData { + pub resource_id: u16, + pub operations: u8, + #[serde(rename = "type")] + pub resource_type: String, + pub value: String, // Base64 encoded +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LwM2MObjectInstance { + pub object_instance_id: u16, + pub resources: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LwM2MObjectData { + pub object_id: u16, + pub object_instances: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DeviceRegistrationParams { + pub device_id: String, + pub objects: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct JsonRpcRequest { + pub jsonrpc: String, + pub method: String, + pub params: serde_json::Value, + pub id: u64, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct JsonRpcResponse { + pub jsonrpc: String, + pub result: Option, + pub error: Option, + pub id: u64, +} + +#[derive(Clone, Debug)] +struct WriteEvent { + device_id: String, + object_id: u16, + object_instance_id: u16, + resource_id: u16, + value_base64: String, + operation: u8, +} + +pub struct ProtocolTranslator { + name: String, + socket_path: String, + api_path: String, + request_id: u64, + write: Option, Message>>>>, + message_id: Arc>, + pending_requests: Arc>>>, + write_events: Option>, +} + +impl ProtocolTranslator { + pub fn new(name: String) -> Self { + Self { + name, + socket_path: "/tmp/edge.sock".to_string(), + api_path: "/1/pt".to_string(), + request_id: 1, + write: None, + message_id: Arc::new(Mutex::new(1)), + pending_requests: Arc::new(Mutex::new(HashMap::new())), + write_events: None, + } + } + + pub fn set_write_events_sender(&mut self, sender: mpsc::Sender) { + self.write_events = Some(sender); + } + + pub async fn connect_and_register(&mut self) -> Result<(), Box> { + let connect_path = self.socket_path.clone(); + let request_uri = format!("ws://localhost{}", self.api_path); + debug!("Connecting to Protocol Translator over UDS: {} -> {}", connect_path, request_uri); + + // Connect to Unix Domain Socket + let stream = UnixStream::connect(connect_path).await?; + + // Perform WebSocket client handshake over the UnixStream + let request = axum::http::Request::builder() + .method("GET") + .uri(&request_uri) + .header("Host", "localhost") + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", generate_key()) + .body(())?; + + let (ws_stream, _response) = client_async(request, stream).await?; + let (write, mut read) = ws_stream.split(); + let write_arc = Arc::new(tokio::sync::Mutex::new(write)); + self.write = Some(write_arc.clone()); + + // Spawn reader task to handle incoming messages and route responses + let pending_map = self.pending_requests.clone(); + let write_events_tx = self.write_events.clone(); + tokio::spawn(async move { + let write_for_calls = write_arc.clone(); + loop { + match read.next().await { + Some(Ok(Message::Text(text))) => { + // { + // "id":"2", + // "jsonrpc":"2.0", + // "method":"write", + // "params":{ + // "operation":2, + // "uri":{ + // "deviceId":"Kitchen_Switch", + // "objectId":3311, + // "objectInstanceId":0, + // "resourceId":5850 + // }, + // "value":"AQ==" + // } + // } + debug!("PT received: {}", text); + if let Ok(value) = serde_json::from_str::(&text) { + if let Some(method) = value.get("method").and_then(|m| m.as_str()) { + debug!("Incoming call: {}", method); + if method == "write" { + // respond with { result: "ok" } using the same id type (string/number) + if let Some(id_field) = value.get("id") { + let response = json!({ + "jsonrpc": "2.0", + "id": id_field.clone(), + "result": "ok", + }); + debug!("Responding to write with ok; id={}", id_field); + let mut writer = write_for_calls.lock().await; + let _ = writer.send(Message::Text(response.to_string())).await; + } else { + debug!("Incoming write missing id; skipping response"); + } + + // Forward event to application for local state update + if let (Some(params), Some(tx)) = (value.get("params"), write_events_tx.as_ref()) { + let uri = params.get("uri"); + let value_b64 = params.get("value"); + let op = params.get("operation"); + if let (Some(uri), Some(value_b64), Some(op)) = (uri, value_b64, op) { + let event = WriteEvent { + device_id: uri.get("deviceId").and_then(|v| v.as_str()).unwrap_or("").to_string(), + object_id: uri.get("objectId").and_then(|v| v.as_u64()).unwrap_or(0) as u16, + object_instance_id: uri.get("objectInstanceId").and_then(|v| v.as_u64()).unwrap_or(0) as u16, + resource_id: uri.get("resourceId").and_then(|v| v.as_u64()).unwrap_or(0) as u16, + value_base64: value_b64.as_str().unwrap_or("").to_string(), + operation: op.as_u64().unwrap_or(0) as u8, + }; + let _ = tx.send(event).await; + } + } + } + } else if let Some(id) = value.get("id").and_then(|v| v.as_u64()) { + let tx_opt = { + let mut pending = pending_map.lock().unwrap(); + pending.remove(&id) + }; + if let Some(tx) = tx_opt { + let _ = tx.send(value); + } else { + debug!("No pending requester for id {}", id); + } + } + } + } + Some(Ok(other)) => { + debug!("PT received non-text message: {:?}", other); + } + Some(Err(e)) => { + error!("PT socket read error: {}", e); + // Break and let caller recreate connection + break; + } + None => { + debug!("PT socket closed by peer"); + break; + } + } + } + // Optionally we could signal a reconnect here using a channel + }); + + // Register the protocol translator identity + let params = json!({ "name": self.name }); + let _ = self.send_request("protocol_translator_register", params).await?; + + info!("Protocol Translator '{}' connected and registered with Edge Core", self.name); + Ok(()) + } + + pub async fn register_device(&mut self, device: &Device) -> Result<(), Box> { + let device_params = self.create_device_params(device); + + debug!("Registering device with Edge Core (JSON-RPC)"); + let params_value = serde_json::to_value(&device_params)?; + let _resp = self.send_request("device_register", params_value).await?; + info!("Device '{}' ({:?}) registered with Edge Core", device.name, device.device_type); + Ok(()) + } + + pub async fn write_device_state(&mut self, device: &Device) -> Result<(), Box> { + // Reuse the same payload structure (deviceId + objects) + let params = self.create_device_params(device); + let params_value = serde_json::to_value(¶ms)?; + // Log the exact JSON we will send + if let Ok(pretty) = serde_json::to_string_pretty(¶ms_value) { + debug!("Sending device state update to Edge Core (write): {}", pretty); + } else { + debug!("Sending device state update to Edge Core (write)"); + } + let _ = self.send_request("write", params_value).await?; + Ok(()) + } + + pub async fn unregister_device(&mut self, device: &Device) -> Result<(), Box> { + let device_id: String = device.name.chars().map(|c| if c.is_alphanumeric() { c } else { '_' }).collect(); + let params = json!({ + "deviceId": device_id, + }); + debug!("Unregistering device from Edge Core (device_unregister): {}", device.name); + let _ = self.send_request("device_unregister", params).await?; + Ok(()) + } + + fn create_device_params(&self, device: &Device) -> DeviceRegistrationParams { + let (object_id, resources) = self.map_device_type_to_lwm2m(&device.device_type, &device.state); + + let resource_data = resources.into_iter().map(|(resource_id, operations, resource_type, value)| { + LwM2MResourceData { + resource_id, + operations, + resource_type, + value: self.encode_value(&value), + } + }).collect(); + + let object_instance = LwM2MObjectInstance { + object_instance_id: 0, + resources: resource_data, + }; + + let lwm2m_object = LwM2MObjectData { + object_id, + object_instances: vec![object_instance], + }; + + let params = DeviceRegistrationParams { + device_id: device.name.chars().map(|c| if c.is_alphanumeric() { c } else { '_' }).collect::(), + objects: vec![lwm2m_object], + }; + + // Extra debug: print the derived on/off value if present + if let Some(obj) = params.objects.first() { + if obj.object_id == LwM2MObjectType::LightControl as u16 { + if let Some(res) = obj.object_instances.first().and_then(|oi| oi.resources.iter().find(|r| r.resource_id == LwM2MResource::OnOffValue as u16)) { + debug!("Derived /{}/0/{} value (base64): {}", obj.object_id, res.resource_id, res.value); + } + } + } + + params + } + + fn map_device_type_to_lwm2m(&self, device_type: &DeviceType, state: &serde_json::Value) -> (u16, Vec<(u16, u8, String, serde_json::Value)>) { + match device_type { + DeviceType::LightBulb => { + let on_off_value = state.get("on").unwrap_or(&serde_json::Value::Bool(false)); + (LwM2MObjectType::LightControl as u16, vec![ + // Register On/Off with READ | WRITE. Some stacks expect "boolean" not "bool". + (LwM2MResource::OnOffValue as u16, OP_READ | OP_WRITE, "boolean".to_string(), on_off_value.clone()), + ]) + }, + DeviceType::Switch => { + let on_off_value = state.get("on").unwrap_or(&serde_json::Value::Bool(false)); + (LwM2MObjectType::LightControl as u16, vec![ + (LwM2MResource::OnOffValue as u16, OP_READ | OP_WRITE, "boolean".to_string(), on_off_value.clone()), + ]) + }, + DeviceType::TemperatureSensor => { + let default_temp = serde_json::Value::Number(serde_json::Number::from_f64(22.0).unwrap()); + let temp_value = state.get("temperature").unwrap_or(&default_temp); + (LwM2MObjectType::TemperatureSensor as u16, vec![ + (LwM2MResource::SensorValue as u16, OP_READ, "float".to_string(), temp_value.clone()), + (LwM2MResource::SensorUnits as u16, OP_READ, "string".to_string(), serde_json::Value::String("Celsius".to_string())), + ]) + }, + DeviceType::HumiditySensor => { + let default_humidity = serde_json::Value::Number(serde_json::Number::from_f64(50.0).unwrap()); + let humidity_value = state.get("humidity").unwrap_or(&default_humidity); + (LwM2MObjectType::HumiditySensor as u16, vec![ + (LwM2MResource::SensorValue as u16, OP_READ, "float".to_string(), humidity_value.clone()), + (LwM2MResource::SensorUnits as u16, OP_READ, "string".to_string(), serde_json::Value::String("%".to_string())), + ]) + }, + } + } + + fn encode_value(&self, value: &serde_json::Value) -> String { + match value { + serde_json::Value::Bool(b) => { + // Some stacks expect boolean as CBOR-like single byte, others as 8-byte int. We keep 1-byte for write/update. + let bytes = if *b { vec![1u8] } else { vec![0u8] }; + general_purpose::STANDARD.encode(&bytes) + }, + serde_json::Value::Number(n) => { + if let Some(f) = n.as_f64() { + let mut bytes = vec![0u8; 8]; + bytes.copy_from_slice(&f.to_be_bytes()); + general_purpose::STANDARD.encode(&bytes) + } else if let Some(i) = n.as_i64() { + let mut bytes = vec![0u8; 8]; + bytes.copy_from_slice(&i.to_be_bytes()); + general_purpose::STANDARD.encode(&bytes) + } else { + general_purpose::STANDARD.encode(value.to_string().as_bytes()) + } + }, + serde_json::Value::String(s) => { + general_purpose::STANDARD.encode(s.as_bytes()) + }, + _ => { + general_purpose::STANDARD.encode(value.to_string().as_bytes()) + } + } + } + + async fn send_request(&mut self, method: &str, params: serde_json::Value) -> Result> { + let id = { + let mut guard = self.message_id.lock().unwrap(); + *guard += 1; + *guard + }; + + let request = json!({ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + }); + + // Log the outgoing JSON-RPC request + if let Ok(pretty) = serde_json::to_string_pretty(&request) { + debug!("PT sending: {}", pretty); + } + + let (tx, rx) = tokio::sync::oneshot::channel(); + { + let mut pending = self.pending_requests.lock().unwrap(); + pending.insert(id, tx); + } + + if let Some(write_arc) = &self.write { + let mut writer = write_arc.lock().await; + writer.send(Message::Text(request.to_string())).await?; + } else { + return Err("WebSocket not connected".into()); + } + + let response = rx.await?; + Ok(response) + } +} + +// Simple reconnect loop that can be used by callers +async fn ensure_pt_connected(pt: &mut ProtocolTranslator) { + let mut backoff_ms: u64 = 500; + loop { + match pt.connect_and_register().await { + Ok(_) => return, + Err(e) => { + error!("Failed to connect/register PT: {}. Retrying in {}ms", e, backoff_ms); + tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(10_000); + } + } + } +} + struct AppState { db: PgPool, + protocol_translator: Arc>, } #[tokio::main] @@ -105,11 +537,71 @@ async fn main() -> Result<(), Box> { sqlx::migrate!("./migrations").run(&pool).await?; info!("Database migrations completed successfully"); + // Initialize Protocol Translator + debug!("Initializing Protocol Translator"); + let mut protocol_translator = ProtocolTranslator::new("dummy-device-app".to_string()); + // Channel for write events from PT -> app + let (write_tx, mut write_rx) = mpsc::channel::(32); + protocol_translator.set_write_events_sender(write_tx); + // Use the new ensure_pt_connected function to handle reconnection + ensure_pt_connected(&mut protocol_translator).await; + info!("Protocol Translator initialized and connected to Edge Core"); + // Initialize with some default devices if none exist debug!("Checking for existing devices"); - initialize_default_devices(&pool).await?; + initialize_default_devices(&pool, &mut protocol_translator).await?; + + let state = Arc::new(AppState { + db: pool, + protocol_translator: Arc::new(tokio::sync::Mutex::new(protocol_translator)), + }); - let state = Arc::new(AppState { db: pool }); + // Spawn task to consume write events and update local DB state + { + let state = state.clone(); + tokio::spawn(async move { + while let Some(event) = write_rx.recv().await { + // event -> { + // device_id: "Kitchen_Switch", + // object_id: 3311, + // object_instance_id: 0, + // resource_id: 5850, + // value_base64: "AQ==", + // operation: 2 + // } + debug!("Applying PT write to local state: {:?}", event); + if event.object_id == LwM2MObjectType::LightControl as u16 && event.resource_id == LwM2MResource::OnOffValue as u16 { + // Decode base64 1-byte boolean + if let Ok(bytes) = general_purpose::STANDARD.decode(&event.value_base64) { + let new_on = bytes.get(0).copied().unwrap_or(0) != 0; + // Update by device name (deviceId). Fallback to unsanitized (underscores -> spaces) if no match + let name_exact = event.device_id.clone(); + let name_spaces = name_exact.replace('_', " "); + match sqlx::query( + r#"UPDATE devices SET state = $1, updated_at = $2 WHERE name = $3 OR name = $4"# + ) + .bind(serde_json::json!({"on": new_on})) + .bind(Utc::now()) + .bind(&name_exact) + .bind(&name_spaces) + .execute(&state.db) + .await { + Ok(res) => { + if res.rows_affected() == 0 { + warn!("PT write applied to 0 rows for deviceId='{}' (also tried '{}')", name_exact, name_spaces); + } else { + info!("Updated local device '{}' on={} from PT write (rows={})", name_exact, new_on, res.rows_affected()); + } + } + Err(e) => { + error!("Failed to apply PT write to DB for {}: {}", name_exact, e); + } + } + } + } + } + }); + } // CORS configuration debug!("Configuring CORS"); @@ -126,6 +618,7 @@ async fn main() -> Result<(), Box> { .route("/devices", post(create_device)) .route("/devices/:id", get(get_device)) .route("/devices/:id/state", put(update_device_state)) + .route("/devices/:id", delete(delete_device)) .layer(cors) .layer(middleware::from_fn(log_requests)) .with_state(state); @@ -254,7 +747,7 @@ async fn get_device( let id: Uuid = row.try_get("id").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let name: String = row.try_get("name").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let device_type_str: String = row.try_get("device_type").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let state: serde_json::Value = row.try_get("state").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let device_state: serde_json::Value = row.try_get("state").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let created_at: DateTime = row.try_get("created_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let updated_at: DateTime = row.try_get("updated_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -265,7 +758,7 @@ async fn get_device( id, name, device_type, - state, + state: device_state, created_at, updated_at, }; @@ -338,7 +831,7 @@ async fn create_device( error!("Failed to get device_type from row: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; - let state: serde_json::Value = row.try_get("state").map_err(|e| { + let device_state: serde_json::Value = row.try_get("state").map_err(|e| { error!("Failed to get state from row: {}", e); StatusCode::INTERNAL_SERVER_ERROR })?; @@ -363,7 +856,7 @@ async fn create_device( id, name, device_type, - state, + state: device_state, created_at, updated_at, }; @@ -373,6 +866,16 @@ async fn create_device( info!("Successfully created device: {} ({:?}) with id: {}", device.name, device.device_type, device.id); debug!("Device details: created_at={}, updated_at={}, state={:?}", device.created_at, device.updated_at, device.state); + // Register device with Protocol Translator (Edge Core) + debug!("Registering device with Protocol Translator"); + { + let mut pt = state.protocol_translator.lock().await; + if let Err(e) = pt.register_device(&device).await { + error!("Failed to register device with Protocol Translator: {}", e); + // Continue anyway - device is created in database + } + } + let response = ApiResponse { success: true, data: Some(device), @@ -394,6 +897,67 @@ async fn create_device( } } +async fn delete_device( + State(state): State>, + Path(id): Path, +) -> Result>, StatusCode> { + debug!("DELETE /devices/{} - Deleting device", id); + + // Fetch device for PT unregister (needs name and type) + let row = sqlx::query( + r#" + SELECT + id, name, device_type::text as device_type, state, created_at, updated_at + FROM devices + WHERE id = $1 + "# + ) + .bind(id) + .fetch_optional(&state.db) + .await + .map_err(|e| { + error!("Failed to fetch device {} before delete: {}", id, e); + StatusCode::INTERNAL_SERVER_ERROR + })? + .ok_or(StatusCode::NOT_FOUND)?; + + let name: String = row.try_get("name").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let device_type_str: String = row.try_get("device_type").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let device_state: serde_json::Value = row.try_get("state").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let created_at: DateTime = row.try_get("created_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let updated_at: DateTime = row.try_get("updated_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + let device_type = DeviceType::from_string(&device_type_str).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?; + let device = Device { id, name: name.clone(), device_type, state: device_state, created_at, updated_at }; + + // Unregister from PT first (best effort) + { + let mut pt = state.protocol_translator.lock().await; + if let Err(e) = pt.unregister_device(&device).await { + warn!("Failed to unregister device '{}' from PT: {}", device.name, e); + } else { + debug!("Unregistered device '{}' from PT", device.name); + } + } + + // Delete from database + let res = sqlx::query("DELETE FROM devices WHERE id = $1") + .bind(id) + .execute(&state.db) + .await + .map_err(|e| { + error!("Failed to delete device {}: {}", id, e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + if res.rows_affected() == 0 { + return Err(StatusCode::NOT_FOUND); + } + + info!("Deleted device '{}' ({})", name, id); + Ok(Json(ApiResponse { success: true, data: Some("deleted".to_string()), error: None })) +} + async fn update_device_state( State(state): State>, Path(id): Path, @@ -432,7 +996,7 @@ async fn update_device_state( let id: Uuid = row.try_get("id").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let name: String = row.try_get("name").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let device_type_str: String = row.try_get("device_type").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let state: serde_json::Value = row.try_get("state").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let device_state: serde_json::Value = row.try_get("state").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let created_at: DateTime = row.try_get("created_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let updated_at: DateTime = row.try_get("updated_at").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -443,7 +1007,7 @@ async fn update_device_state( id, name, device_type, - state, + state: device_state, created_at, updated_at, }; @@ -451,6 +1015,16 @@ async fn update_device_state( info!("Successfully updated device state: {} ({:?}) -> {:?}", device.name, device.device_type, device.state); debug!("Device update details: updated_at={}, previous_state_unknown", device.updated_at); + // Send the state update to Edge Core via Protocol Translator + { + let mut pt = state.protocol_translator.lock().await; + if let Err(e) = pt.write_device_state(&device).await { + error!("Failed to send device state update to Edge Core: {}", e); + } else { + debug!("Device state update forwarded to Edge Core via PT"); + } + } + Ok(Json(ApiResponse { success: true, data: Some(device), @@ -458,7 +1032,7 @@ async fn update_device_state( })) } -async fn initialize_default_devices(pool: &PgPool) -> Result<(), sqlx::Error> { +async fn initialize_default_devices(pool: &PgPool, protocol_translator: &mut ProtocolTranslator) -> Result<(), Box> { debug!("Checking device count in database"); let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM devices") @@ -471,10 +1045,10 @@ async fn initialize_default_devices(pool: &PgPool) -> Result<(), sqlx::Error> { info!("No devices found, initializing default devices"); let devices = vec![ - ("Living Room Light", DeviceType::LightBulb, serde_json::json!({ "on": false })), - ("Kitchen Switch", DeviceType::Switch, serde_json::json!({ "on": true })), - ("Bedroom Temperature", DeviceType::TemperatureSensor, serde_json::json!({ "temperature": 22.5 })), - ("Office Humidity", DeviceType::HumiditySensor, serde_json::json!({ "humidity": 45.2 })), + ("Canopy Light", DeviceType::LightBulb, serde_json::json!({ "on": false })), + ("Wall Pack Light", DeviceType::LightBulb, serde_json::json!({ "on": true })), + ("Walk-in Cooler Temperature", DeviceType::TemperatureSensor, serde_json::json!({ "temperature": 22.5 })), + ("Ambient Humidity Sensor", DeviceType::HumiditySensor, serde_json::json!({ "humidity": 45.2 })), ]; let device_count = devices.len(); @@ -492,18 +1066,80 @@ async fn initialize_default_devices(pool: &PgPool) -> Result<(), sqlx::Error> { .bind(id) .bind(name) .bind(device_type.to_string()) - .bind(state) + .bind(state.clone()) .bind(now) .bind(now) .execute(pool) .await?; debug!("Successfully created default device: {} ({:?})", name, device_type); + + // Create a temporary device struct for Protocol Translator registration + let temp_device = Device { + id, + name: name.to_string(), + device_type: device_type.clone(), + state: state.clone(), + created_at: now, + updated_at: now, + }; + + // Register with Protocol Translator + if let Err(e) = protocol_translator.register_device(&temp_device).await { + error!("Failed to register default device '{}' with Protocol Translator: {}", name, e); + } else { + debug!("Successfully registered default device '{}' with Protocol Translator", name); + } } info!("Successfully initialized {} default devices", device_count); } else { - debug!("Skipping default device initialization - {} devices already exist", count); + debug!("{} devices already exist - re-registering with Protocol Translator and publishing last known state", count); + + let rows = sqlx::query( + r#" + SELECT + id, name, device_type::text as device_type, state, created_at, updated_at + FROM devices + ORDER BY created_at ASC + "# + ) + .fetch_all(pool) + .await?; + + for row in rows { + let id: Uuid = row.try_get("id")?; + let name: String = row.try_get("name")?; + let device_type_str: String = row.try_get("device_type")?; + let device_state: serde_json::Value = row.try_get("state")?; + let created_at: DateTime = row.try_get("created_at")?; + let updated_at: DateTime = row.try_get("updated_at")?; + + if let Some(device_type) = DeviceType::from_string(&device_type_str) { + let device = Device { + id, + name: name.clone(), + device_type, + state: device_state, + created_at, + updated_at, + }; + + // Register and then publish last known state + if let Err(e) = protocol_translator.register_device(&device).await { + error!("Re-register device '{}' with PT failed: {}", device.name, e); + } else { + debug!("Re-registered '{}' with PT", device.name); + if let Err(e) = protocol_translator.write_device_state(&device).await { + error!("Publish last state for '{}' failed: {}", device.name, e); + } else { + debug!("Published last known state for '{}' via PT", device.name); + } + } + } else { + warn!("Skipping device '{}' with unknown device_type '{}'", name, device_type_str); + } + } } Ok(()) diff --git a/kaas-example/k8s/render.sh b/kaas-example/k8s/render.sh index 33e67cd..086e766 100755 --- a/kaas-example/k8s/render.sh +++ b/kaas-example/k8s/render.sh @@ -1,7 +1,71 @@ #!/bin/bash +# Function to display help +show_help() { + cat << EOF +Usage: $0 [OPTIONS] [node_name2] ... + +Render Kubernetes templates for specified nodes. + +ARGUMENTS: + node_name Node name(s) in UUID format (32 hex characters without hyphens) + Example: 550e8400e29b41d4a716446655440000 + +OPTIONS: + -h, --help Show this help message and exit + +EXAMPLES: + $0 550e8400e29b41d4a716446655440000 + $0 550e8400e29b41d4a716446655440000 6ba7b8109dad11d180b400c04fd430c8 + +DESCRIPTION: + This script renders Kubernetes template files for the specified node(s). + Each node name must be a valid UUID format (32 hex characters without hyphens). + Templates are processed from the templates/ directory and rendered to + the rendered// directory. + +EOF +} + +# Function to validate UUID format +validate_uuid() { + local uuid="$1" + # UUID regex pattern: 32 hexadecimal digits without hyphens + local uuid_pattern='^[0-9a-fA-F]{32}$' + + if [[ $uuid =~ $uuid_pattern ]]; then + return 0 + else + return 1 + fi +} + +# Check for help option +if [[ "$1" == "-h" || "$1" == "--help" ]]; then + show_help + exit 0 +fi + +# Check if at least one argument is provided +if [[ $# -eq 0 ]]; then + echo "Error: No node names provided." + echo "Use '$0 --help' for usage information." + exit 1 +fi + +# Validate all provided node names are UUIDs +for node in "$@"; do + if ! validate_uuid "$node"; then + echo "Error: '$node' is not a valid UUID format." + echo "Node names must be in UUID format (32 hex characters without hyphens)" + echo "Example: 550e8400e29b41d4a716446655440000" + echo "Use '$0 --help' for usage information." + exit 1 + fi +done + # Array of node names -NODE_NAMES=($1) +NODE_NAMES=("$@") # Create rendered directory if it doesn't exist mkdir -p rendered diff --git a/kaas-example/k8s/templates/dummy-device-app-pod.yaml b/kaas-example/k8s/templates/dummy-device-app-pod.yaml index b5025fc..1b969a9 100644 --- a/kaas-example/k8s/templates/dummy-device-app-pod.yaml +++ b/kaas-example/k8s/templates/dummy-device-app-pod.yaml @@ -8,10 +8,11 @@ spec: nodeName: ${NODE_NAME} containers: - name: dummy-device-app - image: ghcr.io/izumanetworks/kaas-example/dummy-device-app:1.0.1 + image: ghcr.io/izumanetworks/kaas-example/dummy-device-app:1.0.2 imagePullPolicy: IfNotPresent ports: - containerPort: 3000 + hostPort: 3000 env: - name: DATABASE_URL value: "postgres://postgres:password@postgres:5432/device_db" @@ -35,4 +36,11 @@ spec: path: /health port: 3000 initialDelaySeconds: 5 - periodSeconds: 5 \ No newline at end of file + periodSeconds: 5 + volumeMounts: + - name: tmp-volume + mountPath: /tmp + volumes: + - name: tmp-volume + hostPath: + path: /tmp \ No newline at end of file diff --git a/kaas-example/k8s/templates/web-portal-pod.yml b/kaas-example/k8s/templates/web-portal-pod.yml index 8221cb8..b074829 100644 --- a/kaas-example/k8s/templates/web-portal-pod.yml +++ b/kaas-example/k8s/templates/web-portal-pod.yml @@ -8,10 +8,11 @@ spec: nodeName: ${NODE_NAME} containers: - name: web-portal - image: ghcr.io/izumanetworks/kaas-example/web-portal:1.0.5 + image: ghcr.io/izumanetworks/kaas-example/web-portal:1.0.6 imagePullPolicy: IfNotPresent ports: - containerPort: 80 + hostPort: 8084 env: - name: REACT_APP_API_URL value: "http://dummy-device-app:3000" diff --git a/kaas-example/web-portal/src/App.js b/kaas-example/web-portal/src/App.js index 3aa9f33..a78d20b 100644 --- a/kaas-example/web-portal/src/App.js +++ b/kaas-example/web-portal/src/App.js @@ -7,7 +7,8 @@ import { Droplets, Plus, RefreshCw, - Settings + Settings, + Trash2 } from 'lucide-react'; const API_BASE_URL = process.env.REACT_APP_API_URL || 'http://localhost:3000'; @@ -68,6 +69,34 @@ function App() { } }; + const deleteDevice = async (deviceId) => { + try { + const response = await axios.delete(`${API_BASE_URL}/devices/${deviceId}`); + if (response.data.success) { + setDevices((prev) => prev.filter((device) => device.id !== deviceId)); + } + } catch (err) { + setError('Failed to delete device'); + console.error('Error deleting device:', err); + } + }; + + const confirmAndDelete = (deviceId) => { + if (window.confirm('Are you sure you want to delete this device?')) { + deleteDevice(deviceId); + } + }; + + const getRandomFloat = () => parseFloat((Math.random() * 100).toFixed(1)); + + const randomizeDeviceValue = (device) => { + if (device.device_type === 'TemperatureSensor') { + updateDeviceState(device.id, { temperature: getRandomFloat() }); + } else if (device.device_type === 'HumiditySensor') { + updateDeviceState(device.id, { humidity: getRandomFloat() }); + } + }; + const getDeviceIcon = (deviceType) => { switch (deviceType) { case 'LightBulb': @@ -103,15 +132,37 @@ function App() { case 'TemperatureSensor': return ( -
- {device.state.temperature}°C +
+
+ {device.state.temperature}°C +
+
+ +
); case 'HumiditySensor': return ( -
- {device.state.humidity}% +
+
+ {device.state.humidity}% +
+
+ +
); @@ -137,10 +188,10 @@ function App() { {/* Header */}

- Device Management Portal + Local Device Management Portal

- Manage your virtual IoT devices + Manage your virtual Gas Station devices

@@ -240,6 +291,16 @@ function App() {

+
+ +
@@ -247,7 +308,7 @@ function App() {
- Last updated: {new Date(device.updated_at).toLocaleString()} + Last updated: {device.updated_at ? new Date(device.updated_at * 1000).toLocaleString() : 'Unknown'}
))}