diff --git a/Cargo.lock b/Cargo.lock index 51c63bbb69..984ae53bf3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4744,6 +4744,7 @@ dependencies = [ "serde", "serde_bare", "utoipa", + "uuid", "vbare", "vbare-compiler", ] diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 3a1a4476da..0aa62fb010 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -33,7 +33,7 @@ mod tunnel_to_ws_task; mod ws_to_tunnel_task; const WEBSOCKET_OPEN_TIMEOUT: Duration = Duration::from_secs(15); -const TUNNEL_ACK_TIMEOUT: Duration = Duration::from_secs(5); +const RESPONSE_START_TIMEOUT: Duration = Duration::from_secs(15); const UPDATE_PING_INTERVAL: Duration = Duration::from_secs(3); #[derive(RivetError, Serialize, Deserialize)] @@ -231,10 +231,10 @@ impl CustomServeTrait for PegboardGateway { Err(ServiceUnavailable.build()) }; - let response_start = tokio::time::timeout(WEBSOCKET_OPEN_TIMEOUT, fut) + let response_start = tokio::time::timeout(RESPONSE_START_TIMEOUT, fut) .await .map_err(|_| { - tracing::warn!("timed out waiting for tunnel ack"); + tracing::warn!("timed out waiting for response start from runner"); ServiceUnavailable.build() })??; @@ -373,10 +373,10 @@ impl CustomServeTrait for PegboardGateway { Err(WebSocketServiceUnavailable.build()) }; - let open_msg = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut) + let open_msg = tokio::time::timeout(WEBSOCKET_OPEN_TIMEOUT, fut) .await .map_err(|_| { - tracing::warn!("timed out waiting for tunnel ack"); + tracing::warn!("timed out waiting for websocket open from runner"); WebSocketServiceUnavailable.build() })??; diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index cd5158b117..344889abcf 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -72,6 +72,7 @@ pub struct SharedState(Arc); impl SharedState { pub fn new(config: &rivet_config::Config, ups: PubSub) -> Self { let gateway_id = protocol::util::generate_gateway_id(); + tracing::info!(gateway_id = %protocol::util::id_to_string(&gateway_id), "setting up shared state for gateway"); let receiver_subject = pegboard::pubsub_subjects::GatewayReceiverSubject::new(gateway_id).to_string(); diff --git a/engine/sdks/rust/runner-protocol/Cargo.toml b/engine/sdks/rust/runner-protocol/Cargo.toml index c137ca9833..5a8481ac4d 100644 --- a/engine/sdks/rust/runner-protocol/Cargo.toml +++ b/engine/sdks/rust/runner-protocol/Cargo.toml @@ -18,3 +18,6 @@ vbare.workspace = true [build-dependencies] vbare-compiler.workspace = true + +[dev-dependencies] +uuid.workspace = true diff --git a/engine/sdks/rust/runner-protocol/src/lib.rs b/engine/sdks/rust/runner-protocol/src/lib.rs index 370d763d36..2924477d49 100644 --- a/engine/sdks/rust/runner-protocol/src/lib.rs +++ b/engine/sdks/rust/runner-protocol/src/lib.rs @@ -1,6 +1,7 @@ pub mod compat; pub mod generated; pub mod util; +pub mod uuid_compat; pub mod versioned; // Re-export latest diff --git a/engine/sdks/rust/runner-protocol/src/uuid_compat.rs b/engine/sdks/rust/runner-protocol/src/uuid_compat.rs new file mode 100644 index 0000000000..83f74d7bc8 --- /dev/null +++ b/engine/sdks/rust/runner-protocol/src/uuid_compat.rs @@ -0,0 +1,209 @@ +/// Encode custom bytes into a UUID v4 format +/// +/// This function takes up to 14 bytes of custom data and encodes them into a UUID v4 +/// format, completely skipping the version and variant bytes to avoid any data loss: +/// - Bytes 0-5: data[0..6] +/// - Byte 6: Version byte (0x40 = version 4), NOT used for custom data +/// - Byte 7: data[6] +/// - Byte 8: Variant byte (0x80 = variant), NOT used for custom data +/// - Bytes 9-15: data[7..14] +/// +/// # Arguments +/// * `data` - Slice of bytes to encode (max 14 bytes). If less than 14 bytes, remaining bytes are zeroed. +/// +/// # Returns +/// A 16-byte array representing a UUID v4 +/// +/// # Panics +/// Panics if data length exceeds 14 bytes +pub fn encode_bytes_to_uuid(data: &[u8]) -> [u8; 16] { + assert!(data.len() <= 14, "data must be at most 14 bytes"); + + let mut uuid = [0u8; 16]; + + // Bytes 0-5: First 6 bytes of custom data + let copy_len = data.len().min(6); + uuid[..copy_len].copy_from_slice(&data[..copy_len]); + + // Byte 6: Version byte (0x40 = version 4) - NO custom data + uuid[6] = 0x40; + + // Byte 7: Next byte of custom data (data[6]) + if data.len() > 6 { + uuid[7] = data[6]; + } + + // Byte 8: Variant byte (0x80 = variant) - NO custom data + uuid[8] = 0x80; + + // Bytes 9-15: Remaining custom data (data[7..14]) + if data.len() > 7 { + let remaining_len = (data.len() - 7).min(7); + uuid[9..9 + remaining_len].copy_from_slice(&data[7..7 + remaining_len]); + } + + uuid +} + +/// Decode custom bytes from a UUID v4 format +/// +/// This function extracts the custom data bytes from a UUID v4, completely +/// skipping the version and variant bytes: +/// - Bytes 0-5: uuid[0..6] +/// - Byte 6: UUID version byte - SKIPPED +/// - Byte 7: uuid[7] -> data[6] +/// - Byte 8: UUID variant byte - SKIPPED +/// - Bytes 9-15: uuid[9..16] -> data[7..14] +/// +/// # Arguments +/// * `uuid` - 16-byte UUID array +/// +/// # Returns +/// A 14-byte array containing the extracted custom data +pub fn decode_bytes_from_uuid(uuid: &[u8; 16]) -> [u8; 14] { + let mut data = [0u8; 14]; + + // Bytes 0-5: First 6 bytes of custom data + data[..6].copy_from_slice(&uuid[..6]); + + // Byte 6: Skip UUID version byte, take uuid[7] + data[6] = uuid[7]; + + // Bytes 7-13: Take uuid[9..16] (skip variant byte at uuid[8]) + data[7..].copy_from_slice(&uuid[9..16]); + + data +} + +#[cfg(test)] +mod tests { + use super::*; + use uuid::Uuid; + + #[test] + fn test_encode_decode_roundtrip() { + // Test with 14 bytes of custom data + let original = [ + 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0xFF, 0x08, 0xFF, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, + ]; + + let uuid_bytes = encode_bytes_to_uuid(&original); + let decoded = decode_bytes_from_uuid(&uuid_bytes); + + // All bytes should match exactly (no masking/loss) + assert_eq!(decoded, original); + } + + #[test] + fn test_uuid_version_bits() { + let data = [0xFF; 14]; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + + // Validate UUID version is 4 + assert_eq!(uuid.get_version_num(), 4); + + // Byte 6 should be exactly 0x40 (version 4, no custom data) + assert_eq!(uuid_bytes[6], 0x40); + } + + #[test] + fn test_uuid_variant_bits() { + let data = [0xFF; 14]; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + + // Validate UUID variant is RFC4122 + assert_eq!(uuid.get_variant(), uuid::Variant::RFC4122); + + // Byte 8 should be exactly 0x80 (variant, no custom data) + assert_eq!(uuid_bytes[8], 0x80); + } + + #[test] + fn test_encode_partial_data() { + // Test with less than 14 bytes + let data = [0xAA, 0xBB, 0xCC]; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + + assert_eq!(uuid_bytes[0], 0xAA); + assert_eq!(uuid_bytes[1], 0xBB); + assert_eq!(uuid_bytes[2], 0xCC); + assert_eq!(uuid_bytes[3], 0x00); // Zeroed + + // Validate UUID version and variant + assert_eq!(uuid.get_version_num(), 4); + assert_eq!(uuid.get_variant(), uuid::Variant::RFC4122); + } + + #[test] + fn test_encode_empty_data() { + let data = []; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + + // Validate UUID version and variant + assert_eq!(uuid.get_version_num(), 4); + assert_eq!(uuid.get_variant(), uuid::Variant::RFC4122); + + // All other bytes should be zero + assert_eq!(uuid_bytes[0..6], [0; 6]); + assert_eq!(uuid_bytes[9..15], [0; 6]); + assert_eq!(uuid_bytes[15], 0x00); // Last byte unused + } + + #[test] + fn test_decode_skips_version_variant_bytes() { + // Create a UUID with specific values + let mut uuid = [0u8; 16]; + uuid[0] = 0x01; + uuid[6] = 0x4F; // Version byte - should be IGNORED + uuid[7] = 0xAA; // This should become data[6] + uuid[8] = 0xBF; // Variant byte - should be IGNORED + uuid[9] = 0xBB; // This should become data[7] + + let decoded = decode_bytes_from_uuid(&uuid); + + assert_eq!(decoded[0], 0x01); + assert_eq!(decoded[6], 0xAA); // From uuid[7] + assert_eq!(decoded[7], 0xBB); // From uuid[9] + } + + #[test] + #[should_panic(expected = "data must be at most 14 bytes")] + fn test_encode_too_much_data() { + let data = [0xFF; 15]; + encode_bytes_to_uuid(&data); + } + + #[test] + fn test_all_zeros() { + let data = [0x00; 14]; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + let decoded = decode_bytes_from_uuid(&uuid_bytes); + + // All custom data should be zero + assert_eq!(decoded, [0; 14]); + + // Validate UUID version and variant + assert_eq!(uuid.get_version_num(), 4); + assert_eq!(uuid.get_variant(), uuid::Variant::RFC4122); + } + + #[test] + fn test_all_ones() { + let data = [0xFF; 14]; + let uuid_bytes = encode_bytes_to_uuid(&data); + let uuid = Uuid::from_bytes(uuid_bytes); + let decoded = decode_bytes_from_uuid(&uuid_bytes); + + // All custom data should be preserved exactly + assert_eq!(decoded, [0xFF; 14]); + + // Validate UUID version and variant + assert_eq!(uuid.get_version_num(), 4); + assert_eq!(uuid.get_variant(), uuid::Variant::RFC4122); + } +} diff --git a/engine/sdks/rust/runner-protocol/src/versioned.rs b/engine/sdks/rust/runner-protocol/src/versioned.rs index c57349c38a..a72ea6913c 100644 --- a/engine/sdks/rust/runner-protocol/src/versioned.rs +++ b/engine/sdks/rust/runner-protocol/src/versioned.rs @@ -2,6 +2,7 @@ use anyhow::{Ok, Result, bail}; use vbare::OwnedVersionedData; use crate::generated::{v1, v2, v3}; +use crate::uuid_compat::{decode_bytes_from_uuid, encode_bytes_to_uuid}; pub enum ToClient { V1(v1::ToClient), @@ -174,19 +175,21 @@ impl ToClient { }) } v2::ToClient::ToClientTunnelMessage(msg) => { - // Extract v3 message_id from v2's message_id - // v3: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes - // v2.message_id contains: entire v3 message_id (10 bytes) + padding (6 bytes) + // Extract v3 message_id from v2's UUIDs + // v2.message_id (UUID) contains: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes + let decoded = decode_bytes_from_uuid(&msg.request_id); + let mut gateway_id = [0u8; 4]; - gateway_id.copy_from_slice(&msg.message_id[..4]); + gateway_id.copy_from_slice(&decoded[..4]); let mut request_id = [0u8; 4]; - request_id.copy_from_slice(&msg.request_id[..4]); + request_id.copy_from_slice(&decoded[4..8]); + let message_index = u16::from_le_bytes([decoded[8], decoded[9]]); v3::ToClient::ToClientTunnelMessage(v3::ToClientTunnelMessage { message_id: v3::MessageId { gateway_id, request_id, - message_index: 0, + message_index, }, message_kind: convert_to_client_tunnel_message_kind_v2_to_v3( msg.message_kind, @@ -252,16 +255,20 @@ impl ToClient { }) } v3::ToClient::ToClientTunnelMessage(msg) => { - // Split v3 message_id into v2's request_id and message_id + // Encode v3 message_id into v2's UUIDs // v3: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes - // v2.request_id = gateway_id (4) + request_id (4) + padding (8 zeros) - // v2.message_id = entire v3 message_id (10 bytes) + padding (4 zeros) - let mut request_id = [0u8; 16]; - let mut message_id = [0u8; 16]; - request_id[..4].copy_from_slice(&msg.message_id.gateway_id); - request_id[4..8].copy_from_slice(&msg.message_id.request_id); - message_id[..8].copy_from_slice(&request_id[0..8]); - request_id[8..10].copy_from_slice(&msg.message_id.message_index.to_le_bytes()); + let mut data = [0u8; 10]; + data[..4].copy_from_slice(&msg.message_id.gateway_id); + data[4..8].copy_from_slice(&msg.message_id.request_id); + data[8..10].copy_from_slice(&msg.message_id.message_index.to_le_bytes()); + + let message_id = encode_bytes_to_uuid(&data); + + // request_id contains gateway_id + request_id for backwards compatibility + let mut request_id_data = [0u8; 8]; + request_id_data[..4].copy_from_slice(&msg.message_id.gateway_id); + request_id_data[4..8].copy_from_slice(&msg.message_id.request_id); + let request_id = encode_bytes_to_uuid(&request_id_data); v2::ToClient::ToClientTunnelMessage(v2::ToClientTunnelMessage { request_id, @@ -510,19 +517,21 @@ impl ToServer { }) } v2::ToServer::ToServerTunnelMessage(msg) => { - // Extract v3 message_id from v2's message_id - // v3: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes - // v2.message_id contains: entire v3 message_id (10 bytes) + padding (6 bytes) + // Extract v3 message_id from v2's UUIDs + // v2.message_id (UUID) contains: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes + let decoded = decode_bytes_from_uuid(&msg.request_id); + let mut gateway_id = [0u8; 4]; - gateway_id.copy_from_slice(&msg.message_id[..4]); + gateway_id.copy_from_slice(&decoded[..4]); let mut request_id = [0u8; 4]; - request_id.copy_from_slice(&msg.request_id[..4]); + request_id.copy_from_slice(&decoded[4..8]); + let message_index = u16::from_le_bytes([decoded[8], decoded[9]]); v3::ToServer::ToServerTunnelMessage(v3::ToServerTunnelMessage { message_id: v3::MessageId { gateway_id, request_id, - message_index: 0, + message_index, }, message_kind: convert_to_server_tunnel_message_kind_v2_to_v3( msg.message_kind, @@ -585,16 +594,20 @@ impl ToServer { }) } v3::ToServer::ToServerTunnelMessage(msg) => { - // Split v3 message_id into v2's request_id and message_id + // Encode v3 message_id into v2's UUIDs // v3: gateway_id (4) + request_id (4) + message_index (2) = 10 bytes - // v2.request_id = gateway_id (4) + request_id (4) + padding (8 zeros) - // v2.message_id = entire v3 message_id (10 bytes) + padding (4 zeros) - let mut request_id = [0u8; 16]; - let mut message_id = [0u8; 16]; - request_id[..4].copy_from_slice(&msg.message_id.gateway_id); - request_id[4..8].copy_from_slice(&msg.message_id.request_id); - message_id[..8].copy_from_slice(&request_id[0..8]); - request_id[8..10].copy_from_slice(&msg.message_id.message_index.to_le_bytes()); + let mut data = [0u8; 10]; + data[..4].copy_from_slice(&msg.message_id.gateway_id); + data[4..8].copy_from_slice(&msg.message_id.request_id); + data[8..10].copy_from_slice(&msg.message_id.message_index.to_le_bytes()); + + let message_id = encode_bytes_to_uuid(&data); + + // request_id contains gateway_id + request_id for backwards compatibility + let mut request_id_data = [0u8; 8]; + request_id_data[..4].copy_from_slice(&msg.message_id.gateway_id); + request_id_data[4..8].copy_from_slice(&msg.message_id.request_id); + let request_id = encode_bytes_to_uuid(&request_id_data); v2::ToServer::ToServerTunnelMessage(v2::ToServerTunnelMessage { request_id,