diff --git a/Cargo.lock b/Cargo.lock index 435a28c2a1..d8269e6712 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1405,7 +1405,7 @@ checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" [[package]] name = "epoxy" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -1444,7 +1444,7 @@ dependencies = [ [[package]] name = "epoxy-protocol" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "rivet-util", @@ -1708,7 +1708,7 @@ dependencies = [ [[package]] name = "gasoline" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-stream", @@ -1757,7 +1757,7 @@ dependencies = [ [[package]] name = "gasoline-macros" -version = "2.2.0" +version = "2.2.1" dependencies = [ "proc-macro2", "quote", @@ -1766,7 +1766,7 @@ dependencies = [ [[package]] name = "gasoline-runtime" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "epoxy", @@ -2784,7 +2784,7 @@ dependencies = [ [[package]] name = "namespace" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "epoxy", @@ -3279,7 +3279,7 @@ checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" [[package]] name = "pegboard" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "base64 0.22.1", @@ -3326,7 +3326,7 @@ dependencies = [ [[package]] name = "pegboard-envoy" -version = "2.1.11-rc.1" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -3364,7 +3364,7 @@ dependencies = [ [[package]] name = "pegboard-gateway" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -3398,7 +3398,7 @@ dependencies = [ [[package]] name = "pegboard-gateway2" -version = "2.1.11-rc.1" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -3432,7 +3432,7 @@ dependencies = [ [[package]] name = "pegboard-outbound" -version = "2.1.11-rc.1" +version = "2.2.1" dependencies = [ "anyhow", "epoxy", @@ -3455,7 +3455,7 @@ dependencies = [ [[package]] name = "pegboard-runner" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4196,7 +4196,7 @@ dependencies = [ [[package]] name = "rivet-api-builder" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4239,7 +4239,7 @@ dependencies = [ [[package]] name = "rivet-api-peer" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4269,7 +4269,7 @@ dependencies = [ [[package]] name = "rivet-api-public" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4303,7 +4303,7 @@ dependencies = [ [[package]] name = "rivet-api-public-openapi-gen" -version = "2.2.0" +version = "2.2.1" dependencies = [ "rivet-api-public", "serde_json", @@ -4312,7 +4312,7 @@ dependencies = [ [[package]] name = "rivet-api-types" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "gasoline", @@ -4327,7 +4327,7 @@ dependencies = [ [[package]] name = "rivet-api-util" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4347,7 +4347,7 @@ dependencies = [ [[package]] name = "rivet-bootstrap" -version = "2.2.0" +version = "2.2.1" dependencies = [ "epoxy", "gasoline", @@ -4367,7 +4367,7 @@ dependencies = [ [[package]] name = "rivet-cache" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "futures-util", @@ -4410,14 +4410,14 @@ dependencies = [ [[package]] name = "rivet-cache-result" -version = "2.2.0" +version = "2.2.1" dependencies = [ "rivet-util", ] [[package]] name = "rivet-config" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "config", @@ -4436,7 +4436,7 @@ dependencies = [ [[package]] name = "rivet-config-schema-gen" -version = "2.2.0" +version = "2.2.1" dependencies = [ "rivet-config", "schemars 0.8.22", @@ -4445,7 +4445,7 @@ dependencies = [ [[package]] name = "rivet-data" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "gasoline", @@ -4459,7 +4459,7 @@ dependencies = [ [[package]] name = "rivet-engine" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4531,7 +4531,7 @@ dependencies = [ [[package]] name = "rivet-engine-runner" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4551,7 +4551,7 @@ dependencies = [ [[package]] name = "rivet-env" -version = "2.2.0" +version = "2.2.1" dependencies = [ "lazy_static", "uuid", @@ -4559,7 +4559,7 @@ dependencies = [ [[package]] name = "rivet-envoy-client" -version = "2.1.11-rc.1" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4580,7 +4580,7 @@ dependencies = [ [[package]] name = "rivet-envoy-protocol" -version = "2.1.11-rc.1" +version = "2.2.1" dependencies = [ "anyhow", "gasoline", @@ -4597,7 +4597,7 @@ dependencies = [ [[package]] name = "rivet-error" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "indoc", @@ -4609,7 +4609,7 @@ dependencies = [ [[package]] name = "rivet-error-macros" -version = "2.2.0" +version = "2.2.1" dependencies = [ "indoc", "proc-macro2", @@ -4620,7 +4620,7 @@ dependencies = [ [[package]] name = "rivet-guard" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "axum 0.8.4", @@ -4668,7 +4668,7 @@ dependencies = [ [[package]] name = "rivet-guard-core" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4712,7 +4712,7 @@ dependencies = [ [[package]] name = "rivet-logs" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "chrono", @@ -4726,7 +4726,7 @@ dependencies = [ [[package]] name = "rivet-metrics" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "console-subscriber", @@ -4745,7 +4745,7 @@ dependencies = [ [[package]] name = "rivet-pools" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-nats", @@ -4777,7 +4777,7 @@ dependencies = [ [[package]] name = "rivet-postgres-util" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "rustls", @@ -4788,7 +4788,7 @@ dependencies = [ [[package]] name = "rivet-runner-protocol" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "gasoline", @@ -4805,7 +4805,7 @@ dependencies = [ [[package]] name = "rivet-runtime" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "console-subscriber", @@ -4832,7 +4832,7 @@ dependencies = [ [[package]] name = "rivet-service-manager" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "chrono", @@ -4849,7 +4849,7 @@ dependencies = [ [[package]] name = "rivet-telemetry" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "rivet-config", @@ -4873,7 +4873,7 @@ dependencies = [ [[package]] name = "rivet-test-deps" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "futures-util", @@ -4891,7 +4891,7 @@ dependencies = [ [[package]] name = "rivet-test-deps-docker" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "portpicker", @@ -4922,7 +4922,7 @@ dependencies = [ [[package]] name = "rivet-tracing-utils" -version = "2.2.0" +version = "2.2.1" dependencies = [ "futures-util", "lazy_static", @@ -4932,7 +4932,7 @@ dependencies = [ [[package]] name = "rivet-types" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "gasoline", @@ -4949,7 +4949,7 @@ dependencies = [ [[package]] name = "rivet-ups-protocol" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "base64 0.22.1", @@ -4962,7 +4962,7 @@ dependencies = [ [[package]] name = "rivet-util" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -4994,7 +4994,7 @@ dependencies = [ [[package]] name = "rivet-util-id" -version = "2.2.0" +version = "2.2.1" dependencies = [ "serde", "thiserror 1.0.69", @@ -5005,7 +5005,7 @@ dependencies = [ [[package]] name = "rivet-workflow-worker" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "epoxy", @@ -6660,7 +6660,7 @@ checksum = "4a1a07cc7db3810833284e8d372ccdc6da29741639ecc70c9ec107df0fa6154c" [[package]] name = "universaldb" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-trait", @@ -6692,7 +6692,7 @@ dependencies = [ [[package]] name = "universalpubsub" -version = "2.2.0" +version = "2.2.1" dependencies = [ "anyhow", "async-nats", diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index 8e9065d2d0..bfc639b46f 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -337,6 +337,84 @@ ] } }, + "/actors2": { + "put": { + "tags": [ + "actors::get_or_create" + ], + "operationId": "actors2_get_or_create", + "parameters": [ + { + "name": "namespace", + "in": "query", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActorsGetOrCreateRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActorsGetOrCreateResponse" + } + } + } + } + } + }, + "post": { + "tags": [ + "actors::create" + ], + "operationId": "actors2_create", + "parameters": [ + { + "name": "namespace", + "in": "query", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActorsCreateRequest" + } + } + }, + "required": true + }, + "responses": { + "200": { + "description": "", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ActorsCreateResponse" + } + } + } + } + } + } + }, "/datacenters": { "get": { "tags": [ diff --git a/engine/packages/api-peer/src/actors/create.rs b/engine/packages/api-peer/src/actors/create.rs index 5f632ba7d3..17413e4104 100644 --- a/engine/packages/api-peer/src/actors/create.rs +++ b/engine/packages/api-peer/src/actors/create.rs @@ -38,3 +38,39 @@ pub async fn create( Ok(CreateResponse { actor: res.actor }) } + +#[tracing::instrument(skip_all)] +pub async fn create2( + ctx: ApiCtx, + _path: (), + query: CreateQuery, + body: CreateRequest, +) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let actor_id = Id::new_v1(ctx.config().dc_label()); + + let res = ctx + .op(pegboard::ops::actor::create::Input2 { + actor_id, + namespace_id: namespace.namespace_id, + name: body.name.clone(), + key: body.key, + pool_name: body.runner_name_selector, + input: body.input.clone(), + crash_policy: body.crash_policy, + // NOTE: This can forward if the user attempts to create an actor with a target dc and this dc + // ends up forwarding to another. + forward_request: true, + // api-peer is always creating in its own datacenter + datacenter_name: None, + }) + .await?; + + Ok(CreateResponse { actor: res.actor }) +} diff --git a/engine/packages/api-peer/src/actors/get_or_create.rs b/engine/packages/api-peer/src/actors/get_or_create.rs index 8b7332c064..364d6d146c 100644 --- a/engine/packages/api-peer/src/actors/get_or_create.rs +++ b/engine/packages/api-peer/src/actors/get_or_create.rs @@ -97,6 +97,97 @@ pub async fn get_or_create( } } +#[tracing::instrument(skip_all)] +pub async fn get_or_create2( + ctx: ApiCtx, + _path: (), + query: GetOrCreateQuery, + body: GetOrCreateRequest, +) -> Result { + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Check if actor already exists for the key + let existing = ctx + .op(pegboard::ops::actor::get_for_key::Input { + namespace_id: namespace.namespace_id, + name: body.name.clone(), + key: body.key.clone(), + fetch_error: true, + }) + .await?; + + if let Some(actor) = existing.actor { + // Actor exists, return it + return Ok(GetOrCreateResponse { + actor, + created: false, + }); + } + + // Actor doesn't exist, create it + let actor_id = Id::new_v1(ctx.config().dc_label()); + + match ctx + .op(pegboard::ops::actor::create::Input2 { + actor_id, + namespace_id: namespace.namespace_id, + name: body.name.clone(), + key: Some(body.key.clone()), + pool_name: body.runner_name_selector, + input: body.input.clone(), + crash_policy: body.crash_policy, + // NOTE: This can forward if the user attempts to create an actor with a target dc and this dc + // ends up forwarding to another. + forward_request: true, + // api-peer is always creating in its own datacenter + datacenter_name: None, + }) + .await + { + Ok(res) => Ok(GetOrCreateResponse { + actor: res.actor, + created: true, + }), + Err(err) => { + // Check if this is a DuplicateKey error and extract the existing actor ID + if let Some(existing_actor_id) = extract_duplicate_key_error(&err) { + tracing::info!( + ?existing_actor_id, + "received duplicate key error, fetching existing actor" + ); + + // Fetch the existing actor - it should be in this datacenter since + // the duplicate key error came from this datacenter + let res = ctx + .op(pegboard::ops::actor::get::Input { + actor_ids: vec![existing_actor_id], + fetch_error: true, + }) + .await?; + + let actor = res + .actors + .into_iter() + .next() + .ok_or_else(|| pegboard::errors::Actor::NotFound.build())?; + + return Ok(GetOrCreateResponse { + actor, + created: false, + }); + } + + // Re-throw the original error if it's not a DuplicateKey + Err(err) + } + } +} + /// Helper function to extract the existing actor ID from a duplicate key error /// /// Returns Some(actor_id) if the error is a duplicate key error with metadata, None otherwise diff --git a/engine/packages/api-peer/src/router.rs b/engine/packages/api-peer/src/router.rs index 5c868c88da..a8c0a14f86 100644 --- a/engine/packages/api-peer/src/router.rs +++ b/engine/packages/api-peer/src/router.rs @@ -23,7 +23,9 @@ pub async fn router( // MARK: Actors .route("/actors", get(actors::list::list)) .route("/actors", post(actors::create::create)) + .route("/actors2", post(actors::create::create2)) .route("/actors", put(actors::get_or_create::get_or_create)) + .route("/actors2", put(actors::get_or_create::get_or_create2)) .route("/actors/{actor_id}", delete(actors::delete::delete)) .route("/actors/names", get(actors::list_names::list_names)) .route( diff --git a/engine/packages/api-public/src/actors/create.rs b/engine/packages/api-public/src/actors/create.rs index c86a8e9647..109a0c65d7 100644 --- a/engine/packages/api-public/src/actors/create.rs +++ b/engine/packages/api-public/src/actors/create.rs @@ -88,3 +88,67 @@ async fn create_inner( .await } } + +#[utoipa::path( + post, + operation_id = "actors2_create", + path = "/actors2", + params(CreateQuery), + request_body(content = CreateRequest, content_type = "application/json"), + responses( + (status = 200, body = CreateResponse), + ), +)] +pub async fn create2( + Extension(ctx): Extension, + Query(query): Query, + Json(body): Json, +) -> Response { + match create2_inner(ctx, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +#[tracing::instrument(skip_all)] +async fn create2_inner( + ctx: ApiCtx, + query: CreateQuery, + body: CreateRequest, +) -> Result { + ctx.skip_auth(); + + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let target_dc_label = super::utils::find_dc_for_actor_creation( + &ctx, + namespace.namespace_id, + &query.namespace, + &body.runner_name_selector, + body.datacenter.as_ref().map(String::as_str), + ) + .await?; + + let query = rivet_api_types::actors::create::CreateQuery { + namespace: query.namespace, + }; + + if target_dc_label == ctx.config().dc_label() { + rivet_api_peer::actors::create::create2(ctx.into(), (), query, body).await + } else { + request_remote_datacenter::( + ctx.config(), + target_dc_label, + "/actors2", + axum::http::Method::POST, + Some(&query), + Some(&body), + ) + .await + } +} diff --git a/engine/packages/api-public/src/actors/get_or_create.rs b/engine/packages/api-public/src/actors/get_or_create.rs index 73cf084077..b48e21bb89 100644 --- a/engine/packages/api-public/src/actors/get_or_create.rs +++ b/engine/packages/api-public/src/actors/get_or_create.rs @@ -98,3 +98,67 @@ async fn get_or_create_inner( .await } } + +#[utoipa::path( + put, + operation_id = "actors2_get_or_create", + path = "/actors2", + params(GetOrCreateQuery), + request_body(content = GetOrCreateRequest, content_type = "application/json"), + responses( + (status = 200, body = GetOrCreateResponse), + ), +)] +pub async fn get_or_create2( + Extension(ctx): Extension, + Query(query): Query, + Json(body): Json, +) -> Response { + match get_or_create_inner2(ctx, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +#[tracing::instrument(skip_all)] +async fn get_or_create_inner2( + ctx: ApiCtx, + query: GetOrCreateQuery, + body: GetOrCreateRequest, +) -> Result { + ctx.skip_auth(); + + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + let target_dc_label = super::utils::find_dc_for_actor_creation( + &ctx, + namespace.namespace_id, + &query.namespace, + &body.runner_name_selector, + body.datacenter.as_ref().map(String::as_str), + ) + .await?; + + let query = GetOrCreateQuery { + namespace: query.namespace, + }; + + if target_dc_label == ctx.config().dc_label() { + rivet_api_peer::actors::get_or_create::get_or_create2(ctx.into(), (), query, body).await + } else { + request_remote_datacenter::( + ctx.config(), + target_dc_label, + "/actors2", + axum::http::Method::PUT, + Some(&query), + Some(&body), + ) + .await + } +} diff --git a/engine/packages/api-public/src/router.rs b/engine/packages/api-public/src/router.rs index 2c8c458d71..debb8856a2 100644 --- a/engine/packages/api-public/src/router.rs +++ b/engine/packages/api-public/src/router.rs @@ -15,9 +15,11 @@ use crate::{actors, ctx, datacenters, health, metadata, namespaces, runner_confi paths( actors::list::list, actors::create::create, + actors::create::create2, actors::delete::delete, actors::list_names::list_names, actors::get_or_create::get_or_create, + actors::get_or_create::get_or_create2, actors::kv_get::kv_get, runners::list, runners::list_names, @@ -79,10 +81,15 @@ pub async fn router( // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) .route("/actors", axum::routing::post(actors::create::create)) + .route("/actors2", axum::routing::post(actors::create::create2)) .route( "/actors", axum::routing::put(actors::get_or_create::get_or_create), ) + .route( + "/actors2", + axum::routing::put(actors::get_or_create::get_or_create2), + ) .route( "/actors/{actor_id}", axum::routing::delete(actors::delete::delete), diff --git a/engine/packages/gasoline/src/error.rs b/engine/packages/gasoline/src/error.rs index 30edb2f7f7..2fa7c686b9 100644 --- a/engine/packages/gasoline/src/error.rs +++ b/engine/packages/gasoline/src/error.rs @@ -214,6 +214,8 @@ impl WorkflowError { Some(deadline_ts) } WorkflowError::Sleep(ts) | WorkflowError::NoSignalFoundAndSleep(_, ts) => Some(*ts), + // Always retry udb errors, they're usually caused by ephemeral outages and can be retried + WorkflowError::Udb(_) => Some(rivet_util::timestamp::now() + 30_000), _ => None, } } diff --git a/engine/packages/guard/src/routing/envoy.rs b/engine/packages/guard/src/routing/envoy.rs index c6f83e6136..9c0d263988 100644 --- a/engine/packages/guard/src/routing/envoy.rs +++ b/engine/packages/guard/src/routing/envoy.rs @@ -1,10 +1,9 @@ -use anyhow::*; +use anyhow::Result; use gas::prelude::*; use rivet_guard_core::{RoutingOutput, request_context::RequestContext}; use std::sync::Arc; -use super::{SEC_WEBSOCKET_PROTOCOL, X_RIVET_TOKEN}; -pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token."; +use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN}; /// Route requests to the envoy service using header-based routing #[tracing::instrument(skip_all)] diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index 404845d9ee..15fa5ed2ab 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -143,7 +143,9 @@ pub async fn route_request( // Find actor to route to let actor_id = Id::parse(&actor_id_str).context("invalid x-rivet-actor header")?; - route_request_inner(ctx, shared_state, req_ctx, actor_id, req_ctx.path(), token).await + route_request_inner(ctx, shared_state, req_ctx, actor_id, req_ctx.path(), token) + .await + .map(Some) } async fn route_request_inner( @@ -153,7 +155,7 @@ async fn route_request_inner( actor_id: Id, stripped_path: &str, _token: Option<&str>, -) -> Result> { +) -> Result { // NOTE: Token validation implemented in EE // Route to peer dc where the actor lives @@ -213,32 +215,34 @@ async fn route_request_inner( } match actor.version { - 2 => handle_actor_v2( - ctx, - shared_state, - actor_id, - actor, - stripped_path, - ready_sub2, - stopped_sub2, - fail_sub2, - destroy_sub2, - ) - .await - .map(Some), - 1 => handle_actor_v1( - ctx, - shared_state, - actor_id, - actor, - stripped_path, - ready_sub, - stopped_sub, - fail_sub, - destroy_sub, - ) - .await - .map(Some), + 2 => { + handle_actor_v2( + ctx, + shared_state, + actor_id, + actor, + stripped_path, + ready_sub2, + stopped_sub2, + fail_sub2, + destroy_sub2, + ) + .await + } + 1 => { + handle_actor_v1( + ctx, + shared_state, + actor_id, + actor, + stripped_path, + ready_sub, + stopped_sub, + fail_sub, + destroy_sub, + ) + .await + } _ => bail!("unknown actor version"), } } diff --git a/engine/packages/guard/src/routing/runner.rs b/engine/packages/guard/src/routing/runner.rs index 4dfdb01f10..820acc6777 100644 --- a/engine/packages/guard/src/routing/runner.rs +++ b/engine/packages/guard/src/routing/runner.rs @@ -1,4 +1,4 @@ -use anyhow::*; +use anyhow::Result; use gas::prelude::*; use rivet_guard_core::{RoutingOutput, request_context::RequestContext}; use std::sync::Arc; diff --git a/engine/packages/pegboard-envoy/src/metrics.rs b/engine/packages/pegboard-envoy/src/metrics.rs index 760bc6d0da..90bb5c2a71 100644 --- a/engine/packages/pegboard-envoy/src/metrics.rs +++ b/engine/packages/pegboard-envoy/src/metrics.rs @@ -31,13 +31,13 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref EVENT_MULTIPLEXER_COUNT: IntGauge = register_int_gauge_with_registry!( - "pegboard_event_multiplexer_count", + "pegboard_envoy_event_multiplexer_count", "Number of active actor event multiplexers.", *REGISTRY ).unwrap(); pub static ref INGESTED_EVENTS_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_ingested_events_total", + "pegboard_envoy_ingested_events_total", "Count of actor events.", *REGISTRY ).unwrap(); diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index a636ddbe3b..5f1938a75a 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -161,7 +161,7 @@ async fn handle_message( }; let actor_res = ctx - .op(pegboard::ops::actor::get_for_envoy::Input { actor_id }) + .op(pegboard::ops::actor::get_for_kv::Input { actor_id }) .await .with_context(|| format!("failed to get envoy for actor: {}", actor_id))?; @@ -170,10 +170,9 @@ async fn handle_message( return Ok(()); }; - // Verify actor belongs to this envoy - if actor.namespace_id != conn.namespace_id || actor.envoy_key != conn.envoy_key { - send_actor_kv_error(&conn, req.request_id, "actor does not belong to envoy") - .await?; + // Verify actor belongs to this namespace + if actor.namespace_id != conn.namespace_id { + send_actor_kv_error(&conn, req.request_id, "actor does not exist").await?; return Ok(()); } diff --git a/engine/packages/pegboard-runner/src/metrics.rs b/engine/packages/pegboard-runner/src/metrics.rs index 931d0c9077..c5edab2c14 100644 --- a/engine/packages/pegboard-runner/src/metrics.rs +++ b/engine/packages/pegboard-runner/src/metrics.rs @@ -31,13 +31,13 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref EVENT_MULTIPLEXER_COUNT: IntGauge = register_int_gauge_with_registry!( - "pegboard_event_multiplexer_count", + "pegboard_runner_event_multiplexer_count", "Number of active actor event multiplexers.", *REGISTRY ).unwrap(); pub static ref INGESTED_EVENTS_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_ingested_events_total", + "pegboard_runner_ingested_events_total", "Count of actor events.", *REGISTRY ).unwrap(); diff --git a/engine/packages/pegboard/src/ops/actor/create.rs b/engine/packages/pegboard/src/ops/actor/create.rs index d3a1cae7c9..604975e813 100644 --- a/engine/packages/pegboard/src/ops/actor/create.rs +++ b/engine/packages/pegboard/src/ops/actor/create.rs @@ -29,6 +29,107 @@ pub struct Output { #[operation] pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result { + // Set up subscriptions before dispatching workflow + let mut create_sub = ctx + .subscribe::(("actor_id", input.actor_id)) + .await?; + let mut fail_sub = ctx + .subscribe::(("actor_id", input.actor_id)) + .await?; + let mut destroy_sub = ctx + .subscribe::(("actor_id", input.actor_id)) + .await?; + + // TODO: check rivetkit version before choosing actor version + + // Dispatch actor workflow + ctx.workflow(crate::workflows::actor::Input { + actor_id: input.actor_id, + name: input.name.clone(), + runner_name_selector: input.runner_name_selector.clone(), + key: input.key.clone(), + namespace_id: input.namespace_id, + crash_policy: input.crash_policy, + input: input.input.clone(), + }) + .tag("actor_id", input.actor_id) + .dispatch() + .await?; + + // Wait for actor creation to complete, fail, or be destroyed + tokio::select! { + res = create_sub.next() => { res?; }, + res = fail_sub.next() => { + let msg = res?; + let error = msg.into_body().error; + + // Check if this request needs to be forwarded + // + // We cannot forward if `datacenter_name` is specified because this actor is being + // restricted to the given datacenter. + if input.forward_request && input.datacenter_name.is_none() { + if let crate::errors::Actor::KeyReservedInDifferentDatacenter { datacenter_label } = &error { + // Forward the request to the correct datacenter + return forward_to_datacenter( + ctx, + *datacenter_label, + input.namespace_id, + input.name.clone(), + input.key.clone(), + input.runner_name_selector.clone(), + input.input.clone(), + input.crash_policy + ).await; + } + } + + // Otherwise, return the error as-is + return Err(error.build()); + } + res = destroy_sub.next() => { + res?; + return Err(crate::errors::Actor::DestroyedDuringCreation.build()); + } + } + + // Fetch the created actor + let actors_res = ctx + .op(crate::ops::actor::get::Input { + actor_ids: vec![input.actor_id], + fetch_error: false, + }) + .await?; + + let actor = actors_res + .actors + .into_iter() + .next() + .ok_or_else(|| crate::errors::Actor::NotFound.build())?; + + Ok(Output { actor }) +} + +#[derive(Debug)] +pub struct Input2 { + pub actor_id: Id, + pub namespace_id: Id, + pub name: String, + pub key: Option, + pub pool_name: String, + pub crash_policy: CrashPolicy, + pub input: Option, + /// If true, will handle ForwardToDatacenter errors by forwarding the request to the correct datacenter. + /// Used by api-public. api-peer should set this to false. + pub forward_request: bool, + /// Datacenter to create the actor in + /// + /// Providing this value will cause an error if attempting to create an actor where the key is + /// reserved in a different datacenter. + pub datacenter_name: Option, +} + +#[operation(Operation2)] +pub async fn pegboard_actor_create2(ctx: &OperationCtx, input: &Input2) -> Result { // Set up subscriptions before dispatching workflow let mut create_sub = ctx .subscribe::(("actor_id", input.actor_id)) @@ -46,7 +147,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result< ctx.workflow(crate::workflows::actor2::Input { actor_id: input.actor_id, name: input.name.clone(), - pool_name: input.runner_name_selector.clone(), + pool_name: input.pool_name.clone(), key: input.key.clone(), namespace_id: input.namespace_id, crash_policy: input.crash_policy, @@ -76,7 +177,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result< input.namespace_id, input.name.clone(), input.key.clone(), - input.runner_name_selector.clone(), + input.pool_name.clone(), input.input.clone(), input.crash_policy ).await; diff --git a/engine/packages/pegboard/src/ops/actor/get_for_envoy.rs b/engine/packages/pegboard/src/ops/actor/get_for_kv.rs similarity index 50% rename from engine/packages/pegboard/src/ops/actor/get_for_envoy.rs rename to engine/packages/pegboard/src/ops/actor/get_for_kv.rs index 73b87f5504..4593710290 100644 --- a/engine/packages/pegboard/src/ops/actor/get_for_envoy.rs +++ b/engine/packages/pegboard/src/ops/actor/get_for_kv.rs @@ -12,13 +12,11 @@ pub struct Input { pub struct Output { pub name: String, pub namespace_id: Id, - pub envoy_key: String, - pub is_connectable: bool, } // TODO: Add cache (remember to purge cache when runner changes) #[operation] -pub async fn pegboard_actor_get_for_runner( +pub async fn pegboard_actor_get_for_kv( ctx: &OperationCtx, input: &Input, ) -> Result> { @@ -28,29 +26,18 @@ pub async fn pegboard_actor_get_for_runner( let name_key = keys::actor::NameKey::new(input.actor_id); let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id); - let envoy_key_key = keys::actor::EnvoyKeyKey::new(input.actor_id); - let connectable_key = keys::actor::ConnectableKey::new(input.actor_id); - let (name_entry, namespace_id_entry, envoy_key_entry, is_connectable) = tokio::try_join!( + let (name_entry, namespace_id_entry) = tokio::try_join!( tx.read_opt(&name_key, Serializable), tx.read_opt(&namespace_id_key, Serializable), - tx.read_opt(&envoy_key_key, Serializable), - tx.exists(&connectable_key, Serializable), )?; - let (Some(name), Some(namespace_id), Some(envoy_key)) = - (name_entry, namespace_id_entry, envoy_key_entry) - else { + let (Some(name), Some(namespace_id)) = (name_entry, namespace_id_entry) else { return Ok(None); }; - Ok(Some(Output { - name, - namespace_id, - envoy_key, - is_connectable, - })) + Ok(Some(Output { name, namespace_id })) }) - .custom_instrument(tracing::info_span!("actor_get_for_runner_tx")) + .custom_instrument(tracing::info_span!("actor_get_for_kv_tx")) .await } diff --git a/engine/packages/pegboard/src/ops/actor/mod.rs b/engine/packages/pegboard/src/ops/actor/mod.rs index 9672546429..3f17bb49e5 100644 --- a/engine/packages/pegboard/src/ops/actor/mod.rs +++ b/engine/packages/pegboard/src/ops/actor/mod.rs @@ -1,8 +1,8 @@ pub mod create; pub mod get; -pub mod get_for_envoy; pub mod get_for_gateway; pub mod get_for_key; +pub mod get_for_kv; pub mod get_for_runner; pub mod get_reservation_for_key; pub mod hibernating_request; diff --git a/engine/packages/pegboard/src/workflows/actor/metrics.rs b/engine/packages/pegboard/src/workflows/actor/metrics.rs index e85a2fff79..d434ef0333 100644 --- a/engine/packages/pegboard/src/workflows/actor/metrics.rs +++ b/engine/packages/pegboard/src/workflows/actor/metrics.rs @@ -42,7 +42,7 @@ impl LifecycleState { } #[workflow] -pub(crate) async fn pegboard_actor2_metrics(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { +pub(crate) async fn pegboard_actor_metrics(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { let start_paused = ctx .activity(InitStateInput { actor_id: input.actor_id, diff --git a/engine/packages/pegboard/src/workflows/actor2/metrics.rs b/engine/packages/pegboard/src/workflows/actor2/metrics.rs index 33085f48ed..f188bb97c7 100644 --- a/engine/packages/pegboard/src/workflows/actor2/metrics.rs +++ b/engine/packages/pegboard/src/workflows/actor2/metrics.rs @@ -42,7 +42,7 @@ impl LifecycleState { } #[workflow] -pub(crate) async fn pegboard_actor_metrics(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { +pub(crate) async fn pegboard_actor2_metrics(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { ctx.activity(InitStateInput { actor_id: input.actor_id, namespace_id: input.namespace_id, diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index 3fed8e80b0..4340840458 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -453,7 +453,7 @@ pub async fn handle_stopped( metrics_workflow_id: Id, variant: StoppedVariant, ) -> Result { - tracing::debug!(?variant, "actor stopped"); + tracing::debug!(?variant, ?state.transition, "actor stopped"); // Save error to state match &variant { @@ -544,19 +544,19 @@ pub async fn handle_stopped( if let Some(allocation) = allocate_res.allocation { state.generation += 1; - // Transition to allocating - state.transition = Transition::Allocating { - destroy_after_start: false, - lost_timeout_ts: util::timestamp::now() - + ctx.config().pegboard().actor_allocation_threshold(), - }; - ctx.activity(SendOutboundInput { generation: state.generation, input: input.input.clone(), allocation, }) .await?; + + // Transition to allocating + state.transition = Transition::Allocating { + destroy_after_start: false, + lost_timeout_ts: util::timestamp::now() + + ctx.config().pegboard().actor_allocation_threshold(), + }; } else { // Transition to retry backoff state.transition = Transition::Sleeping { @@ -602,6 +602,11 @@ pub async fn handle_stopped( } } } else { + // Transition to sleeping + state.transition = Transition::Sleeping { + attempting_reallocation: false, + }; + StoppedResult::Continue }; diff --git a/engine/sdks/typescript/envoy-client/src/config.ts b/engine/sdks/typescript/envoy-client/src/config.ts index eb668573c5..576de298d6 100644 --- a/engine/sdks/typescript/envoy-client/src/config.ts +++ b/engine/sdks/typescript/envoy-client/src/config.ts @@ -156,4 +156,5 @@ export interface EnvoyConfig { generation: number, reason: protocol.StopActorReason, ) => Promise; + onShutdown: () => void; } diff --git a/engine/sdks/typescript/envoy-client/src/handle.ts b/engine/sdks/typescript/envoy-client/src/handle.ts index f3d51a6ddb..c09e575ff2 100644 --- a/engine/sdks/typescript/envoy-client/src/handle.ts +++ b/engine/sdks/typescript/envoy-client/src/handle.ts @@ -1,3 +1,6 @@ +import * as protocol from "@rivetkit/engine-envoy-protocol"; +import { ActorEntry } from "./tasks/envoy"; + export interface KvListOptions { reverse?: boolean; limit?: number; @@ -5,7 +8,13 @@ export interface KvListOptions { export interface EnvoyHandle { /** Starts the shutdown procedure for this envoy. */ - shutdown(): void; + shutdown(immediate: boolean): void; + + getProtocolMetadata(): protocol.ProtocolMetadata | undefined; + + getEnvoyKey(): string; + + getActor(actorId: string, generation?: number): ActorEntry | undefined; /** Send sleep intent for an actor. */ sleepActor(actorId: string, generation?: number): void; diff --git a/engine/sdks/typescript/envoy-client/src/index.ts b/engine/sdks/typescript/envoy-client/src/index.ts index 1728e1b5bf..9ed8e96299 100644 --- a/engine/sdks/typescript/envoy-client/src/index.ts +++ b/engine/sdks/typescript/envoy-client/src/index.ts @@ -6,4 +6,7 @@ export { type ToEnvoyMessage, type ToEnvoyFromConnMessage, startEnvoy, + startEnvoySync, } from "./tasks/envoy/index.js"; +export * as protocol from "@rivetkit/engine-envoy-protocol"; +export * as utils from './utils.js'; \ No newline at end of file diff --git a/engine/sdks/typescript/envoy-client/src/stringify.ts b/engine/sdks/typescript/envoy-client/src/stringify.ts index 7f5904bed1..34bd356641 100644 --- a/engine/sdks/typescript/envoy-client/src/stringify.ts +++ b/engine/sdks/typescript/envoy-client/src/stringify.ts @@ -158,14 +158,14 @@ export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string { export function stringifyToRivet(message: protocol.ToRivet): string { switch (message.tag) { case "ToRivetInit": { - const { envoyKey, name, version, prepopulateActorNames, metadata } = + const { envoyKey, version, prepopulateActorNames, metadata } = message.val; const prepopulateActorNamesStr = prepopulateActorNames === null ? "null" : `Map(${prepopulateActorNames.size})`; const metadataStr = metadata === null ? "null" : `"${metadata}"`; - return `ToRivetInit{envoyKey: "${envoyKey}", name: "${name}", version: ${version}, prepopulateActorNames: ${prepopulateActorNamesStr}, metadata: ${metadataStr}}`; + return `ToRivetInit{envoyKey: "${envoyKey}", version: ${version}, prepopulateActorNames: ${prepopulateActorNamesStr}, metadata: ${metadataStr}}`; } case "ToRivetEvents": { const events = message.val; @@ -201,7 +201,7 @@ export function stringifyToEnvoy(message: protocol.ToEnvoy): string { switch (message.tag) { case "ToEnvoyInit": { const { metadata } = message.val; - const metadataStr = `{envoyLostThreshold: ${stringifyBigInt(metadata.envoyLostThreshold)}, actorStopThreshold: ${stringifyBigInt(metadata.actorStopThreshold)}, serverlessDrainGracePeriod: ${metadata.serverlessDrainGracePeriod === null ? "null" : stringifyBigInt(metadata.serverlessDrainGracePeriod)}}`; + const metadataStr = `{envoyLostThreshold: ${stringifyBigInt(metadata.envoyLostThreshold)}, actorStopThreshold: ${stringifyBigInt(metadata.actorStopThreshold)}, serverlessDrainGracePeriod: ${metadata.serverlessDrainGracePeriod === null ? "null" : stringifyBigInt(metadata.serverlessDrainGracePeriod)}, maxResponsePayloadSize: ${stringifyBigInt(metadata.maxResponsePayloadSize)}}`; return `ToEnvoyInit{metadata: ${metadataStr}}`; } case "ToEnvoyCommands": { diff --git a/engine/sdks/typescript/envoy-client/src/tasks/actor.ts b/engine/sdks/typescript/envoy-client/src/tasks/actor.ts index 7c3d4c2c8b..e0a5a816a3 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/actor.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/actor.ts @@ -10,7 +10,6 @@ import { logger } from "../log.js"; import { unreachable } from "antiox/panic"; import { stringifyError } from "../utils.js"; import { sendResponse } from "./envoy/tunnel.js"; -import { EnvoyContext } from "./envoy/index.js"; export interface CreateActorOpts { commandIdx: bigint; @@ -73,10 +72,10 @@ interface ActorContext { [protocol.GatewayId, protocol.RequestId], PendingRequest >; - // webSockets: Map< - // [protocol.GatewayId, protocol.RequestId], - // WebSocketTunnelAdapter - // >; + webSockets: Map< + [protocol.GatewayId, protocol.RequestId], + WebSocketTunnelAdapter + >; } export function createActor( @@ -124,7 +123,18 @@ async function actorInner( stopMessage = error instanceof Error ? error.message : "actor start failed"; - sendStoppedEvent(ctx, stopCode, stopMessage); + sendEvent(ctx, { + tag: "EventActorStateUpdate", + val: { + state: { + tag: "ActorStateStopped", + val: { + code: stopCode, + message: stopMessage + }, + }, + }, + }); return; } @@ -161,7 +171,18 @@ async function actorInner( : "actor stop failed"; } - sendStoppedEvent(ctx, stopCode, stopMessage); + sendEvent(ctx, { + tag: "EventActorStateUpdate", + val: { + state: { + tag: "ActorStateStopped", + val: { + code: stopCode, + message: stopMessage + }, + }, + }, + }); return; } else if (msg.type === "set-alarm") { sendEvent(ctx, { @@ -300,22 +321,6 @@ function sendEvent(ctx: ActorContext, inner: protocol.Event) { }); } -function sendStoppedEvent( - ctx: ActorContext, - code: protocol.StopCode, - message: string | null, -) { - const checkpoint = incrementCheckpoint(ctx); - ctx.shared.envoyTx.send({ - type: "command-stop-actor-complete", - checkpointIndex: checkpoint.index, - actorId: ctx.actorId, - generation: ctx.generation, - code, - message, - }); -} - function incrementCheckpoint(ctx: ActorContext): protocol.ActorCheckpoint { const index = ctx.eventIndex; ctx.eventIndex++; diff --git a/engine/sdks/typescript/envoy-client/src/tasks/connection.ts b/engine/sdks/typescript/envoy-client/src/tasks/connection.ts index 84809ead00..8b3bdd8d76 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/connection.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/connection.ts @@ -152,6 +152,8 @@ function forwardToEnvoy(ctx: SharedContext, message: protocol.ToEnvoy) { val: { ts: message.val.ts }, }); } else { + if (ctx.envoyTx.isClosed()) console.error("envoy tx should not be closed"); + ctx.envoyTx.send({ type: "conn-message", message }); } } diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts index 354671d766..3b4c90f338 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts @@ -36,6 +36,7 @@ export async function handleCommands( } generations.set(checkpoint.generation, { handle, + name: val.config.name, eventHistory: [], lastCommandIdx: checkpoint.index, }); diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts index d0281d10bd..17247dbdf1 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts @@ -16,6 +16,14 @@ export function handleSendEvents( ); if (entry) { entry.eventHistory.push(event); + + // Close the actor channel but keep event history for ack/resend. + // The entry is cleaned up when all events are acked. + if (event.inner.tag === "EventActorStateUpdate") { + if (event.inner.val.state.tag === "ActorStateStopped") { + entry.handle.close(); + } + } } } @@ -26,40 +34,6 @@ export function handleSendEvents( }); } -export function handleCommandStopActorComplete( - ctx: EnvoyContext, - msg: Extract, -) { - const event: protocol.EventWrapper = { - checkpoint: { - actorId: msg.actorId, - generation: msg.generation, - index: msg.checkpointIndex, - }, - inner: { - tag: "EventActorStateUpdate", - val: { - state: { - tag: "ActorStateStopped", - val: { - code: msg.code, - message: msg.message, - }, - }, - }, - }, - }; - - handleSendEvents(ctx, [event]); - - // Close the actor channel but keep event history for ack/resend. - // The entry is cleaned up when all events are acked. - const entry = getActorEntry(ctx, msg.actorId, msg.generation); - if (entry) { - entry.handle.close(); - } -} - export function handleAckEvents( ctx: EnvoyContext, ack: protocol.ToEnvoyAckEvents, diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts index 123aa819ab..59e7ed3718 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts @@ -16,7 +16,6 @@ import { } from "./commands.js"; import { handleAckEvents, - handleCommandStopActorComplete, handleSendEvents, resendUnacknowledgedEvents, } from "./events.js"; @@ -29,7 +28,7 @@ import { handleKvResponse, processUnsentKvRequests, } from "./kv.js"; -import { spawn, watch, WatchSender } from "antiox"; +import { spawn, watch, WatchReceiver, WatchSender } from "antiox"; export interface EnvoyContext { shared: SharedContext; @@ -42,6 +41,7 @@ export interface EnvoyContext { export interface ActorEntry { handle: UnboundedSender; + name: string; eventHistory: protocol.EventWrapper[]; lastCommandIdx: bigint; } @@ -64,14 +64,6 @@ export type ToEnvoyMessage = type: "send-events"; events: protocol.EventWrapper[]; } - | { - type: "command-stop-actor-complete"; - actorId: string; - generation: number; - checkpointIndex: bigint; - code: protocol.StopCode; - message: string | null; - } | { type: "kv-request"; actorId: string; @@ -83,16 +75,26 @@ export type ToEnvoyMessage = | { type: "evict" }; export async function startEnvoy(config: EnvoyConfig): Promise { + const [handle, startRx] = startEnvoySync(config); + + // Wait for envoy start + await startRx.changed(); + + return handle; +} + +// Must manually wait for envoy to start. +export function startEnvoySync(config: EnvoyConfig): [EnvoyHandle, WatchReceiver] { const [envoyTx, envoyRx] = unboundedChannel(); const [startTx, startRx] = watch(void 0); const actors: Map> = new Map(); - const handle = createHandle(actors, envoyTx); const shared: SharedContext = { config, envoyKey: uuidv4(), envoyTx, - handle, + // Start undefined + handle: null as any, }; startConnection(shared); @@ -105,6 +107,10 @@ export async function startEnvoy(config: EnvoyConfig): Promise { requestToActor: new Map(), }; + // Set shared handle + const handle = createHandle(ctx); + shared.handle = handle; + log(ctx.shared)?.info({ msg: "starting envoy" }); spawn(async () => { @@ -121,8 +127,6 @@ export async function startEnvoy(config: EnvoyConfig): Promise { await handleConnMessage(ctx, startTx, msg.message); } else if (msg.type === "send-events") { handleSendEvents(ctx, msg.events); - } else if (msg.type === "command-stop-actor-complete") { - handleCommandStopActorComplete(ctx, msg); } else if (msg.type === "kv-request") { handleKvRequest(ctx, msg); } else if (msg.type === "shutdown") { @@ -158,10 +162,7 @@ export async function startEnvoy(config: EnvoyConfig): Promise { ctx.actors.clear(); }); - // Wait for envoy start - await startRx.changed(); - - return handle; + return [handle, startRx]; } async function handleConnMessage( @@ -218,94 +219,29 @@ export function getActorEntry( // MARK: Handle function createHandle( - actors: Map>, - envoyTx: UnboundedSender, + ctx: EnvoyContext, ): EnvoyHandle { - // NOTE: Because of the weird order we have to create the handle/shared context/envoy context, these - // functions are defined here instead of globally - function findActor( - actorId: string, - generation?: number, - ): ActorEntry | undefined { - const gens = actors.get(actorId); - if (!gens || gens.size === 0) return undefined; - - if (generation !== undefined) { - return gens.get(generation); - } - - // Return first non-closed (active) entry - for (const entry of gens.values()) { - if (!entry.handle.isClosed()) { - return entry; - } - } - return undefined; - } - - function sendActorIntent( - actorId: string, - intent: protocol.ActorIntent, - generation?: number, - ): void { - const entry = findActor(actorId, generation); - if (!entry) return; - entry.handle.send({ - type: "actor-intent", - commandIdx: 0n, - intent, - }); - } - - function sendKvRequest( - actorId: string, - data: protocol.KvRequestData, - ): Promise { - return new Promise((resolve, reject) => { - envoyTx.send({ - type: "kv-request", - actorId, - data, - resolve, - reject, - }); - }); - } + return { + shutdown(immediate: boolean) { + ctx.shared.envoyTx.send({ type: "shutdown" }); + ctx.shared.config.onShutdown(); + }, - function toBuffer(arr: Uint8Array): ArrayBuffer { - return arr.buffer.slice( - arr.byteOffset, - arr.byteOffset + arr.byteLength, - ) as ArrayBuffer; - } + getProtocolMetadata(): protocol.ProtocolMetadata | undefined { + return ctx.shared.protocolMetadata; + }, - function parseListResponse( - response: protocol.KvResponseData, - ): [Uint8Array, Uint8Array][] { - const val = ( - response as { - tag: "KvListResponse"; - val: protocol.KvListResponse; - } - ).val; - const result: [Uint8Array, Uint8Array][] = []; - for (let i = 0; i < val.keys.length; i++) { - const key = val.keys[i]; - const value = val.values[i]; - if (key && value) { - result.push([new Uint8Array(key), new Uint8Array(value)]); - } - } - return result; - } + getEnvoyKey(): string { + return ctx.shared.envoyKey; + }, - return { - shutdown() { - envoyTx.send({ type: "shutdown" }); + getActor(actorId: string, generation?: number): ActorEntry | undefined { + return getActor(ctx, actorId, generation); }, sleepActor(actorId: string, generation?: number): void { sendActorIntent( + ctx, actorId, { tag: "ActorIntentSleep", val: null }, generation, @@ -314,6 +250,7 @@ function createHandle( stopActor(actorId: string, generation?: number): void { sendActorIntent( + ctx, actorId, { tag: "ActorIntentStop", val: null }, generation, @@ -322,6 +259,7 @@ function createHandle( destroyActor(actorId: string, generation?: number): void { sendActorIntent( + ctx, actorId, { tag: "ActorIntentStop", val: null }, generation, @@ -333,7 +271,7 @@ function createHandle( alarmTs: number | null, generation?: number, ): void { - const entry = findActor(actorId, generation); + const entry = getActor(ctx, actorId, generation); if (!entry) return; entry.handle.send({ type: "set-alarm", @@ -346,7 +284,7 @@ function createHandle( keys: Uint8Array[], ): Promise<(Uint8Array | null)[]> { const kvKeys = keys.map(toBuffer); - const response = await sendKvRequest(actorId, { + const response = await sendKvRequest(ctx, actorId, { tag: "KvGetRequest", val: { keys: kvKeys }, }); @@ -385,7 +323,7 @@ function createHandle( actorId: string, options?: KvListOptions, ): Promise<[Uint8Array, Uint8Array][]> { - const response = await sendKvRequest(actorId, { + const response = await sendKvRequest(ctx, actorId, { tag: "KvListRequest", val: { query: { tag: "KvListAllQuery", val: null }, @@ -406,7 +344,7 @@ function createHandle( exclusive?: boolean, options?: KvListOptions, ): Promise<[Uint8Array, Uint8Array][]> { - const response = await sendKvRequest(actorId, { + const response = await sendKvRequest(ctx, actorId, { tag: "KvListRequest", val: { query: { @@ -432,7 +370,7 @@ function createHandle( prefix: Uint8Array, options?: KvListOptions, ): Promise<[Uint8Array, Uint8Array][]> { - const response = await sendKvRequest(actorId, { + const response = await sendKvRequest(ctx, actorId, { tag: "KvListRequest", val: { query: { @@ -455,7 +393,7 @@ function createHandle( ): Promise { const keys = entries.map(([k]) => toBuffer(k)); const values = entries.map(([, v]) => toBuffer(v)); - await sendKvRequest(actorId, { + await sendKvRequest(ctx, actorId, { tag: "KvPutRequest", val: { keys, values }, }); @@ -465,7 +403,7 @@ function createHandle( actorId: string, keys: Uint8Array[], ): Promise { - await sendKvRequest(actorId, { + await sendKvRequest(ctx, actorId, { tag: "KvDeleteRequest", val: { keys: keys.map(toBuffer) }, }); @@ -476,14 +414,14 @@ function createHandle( start: Uint8Array, end: Uint8Array, ): Promise { - await sendKvRequest(actorId, { + await sendKvRequest(ctx, actorId, { tag: "KvDeleteRangeRequest", val: { start: toBuffer(start), end: toBuffer(end) }, }); }, async kvDrop(actorId: string): Promise { - await sendKvRequest(actorId, { + await sendKvRequest(ctx, actorId, { tag: "KvDropRequest", val: null, }); @@ -491,7 +429,65 @@ function createHandle( }; } -export function findActor( +function sendActorIntent( + ctx: EnvoyContext, + actorId: string, + intent: protocol.ActorIntent, + generation?: number, +): void { + const entry = getActor(ctx, actorId, generation); + if (!entry) return; + entry.handle.send({ + type: "actor-intent", + commandIdx: 0n, + intent, + }); +} + +function sendKvRequest( + ctx: EnvoyContext, + actorId: string, + data: protocol.KvRequestData, +): Promise { + return new Promise((resolve, reject) => { + ctx.shared.envoyTx.send({ + type: "kv-request", + actorId, + data, + resolve, + reject, + }); + }); +} + +function toBuffer(arr: Uint8Array): ArrayBuffer { + return arr.buffer.slice( + arr.byteOffset, + arr.byteOffset + arr.byteLength, + ) as ArrayBuffer; +} + +function parseListResponse( + response: protocol.KvResponseData, +): [Uint8Array, Uint8Array][] { + const val = ( + response as { + tag: "KvListResponse"; + val: protocol.KvListResponse; + } + ).val; + const result: [Uint8Array, Uint8Array][] = []; + for (let i = 0; i < val.keys.length; i++) { + const key = val.keys[i]; + const value = val.values[i]; + if (key && value) { + result.push([new Uint8Array(key), new Uint8Array(value)]); + } + } + return result; +} + +export function getActor( ctx: EnvoyContext, actorId: string, generation?: number, @@ -503,8 +499,8 @@ export function findActor( return gens.get(generation); } - // Return first non-closed (active) entry - for (const entry of gens.values()) { + // Return highest generation non-closed (active) entry + for (const entry of Array.from(gens.values()).reverse()) { if (!entry.handle.isClosed()) { return entry; } diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/tunnel.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/tunnel.ts index 3f07269c69..6f9d5fbcdf 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/tunnel.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/tunnel.ts @@ -1,23 +1,34 @@ import * as protocol from "@rivetkit/engine-envoy-protocol"; -import { EnvoyContext, findActor, log } from "./index.js"; +import { EnvoyContext, getActor, log } from "./index.js"; import { SharedContext } from "@/context.js"; import { unreachable } from "antiox"; import { wsSend } from "../connection.js"; export function handleTunnelMessage(ctx: EnvoyContext, msg: protocol.ToEnvoyTunnelMessage) { - if (msg.messageKind.tag === "ToEnvoyRequestStart") { - handleRequestStart(ctx, msg.messageId, msg.messageKind.val); - } else if (msg.messageKind.tag === "ToEnvoyRequestChunk") { - handleRequestChunk(ctx, msg.messageId, msg.messageKind.val); - } else if (msg.messageKind.tag === "ToEnvoyRequestAbort") { - handleRequestAbort(ctx, msg.messageId); + const { + messageId, + messageKind: { tag, val }, + } = msg; + + if (tag === "ToEnvoyRequestStart") { + handleRequestStart(ctx, messageId, val); + } else if (tag === "ToEnvoyRequestChunk") { + handleRequestChunk(ctx, messageId, val); + } else if (tag === "ToEnvoyRequestAbort") { + handleRequestAbort(ctx, messageId); + } else if (tag === "ToEnvoyWebSocketOpen") { + handleWebSocketOpen(ctx, messageId, val); + } else if (tag === "ToEnvoyWebSocketMessage") { + handleWebSocketMessage(ctx, messageId, val); + } else if (tag === "ToEnvoyWebSocketClose") { + handleWebSocketClose(ctx, messageId); } else { - unreachable(msg.messageKind.tag); + unreachable(tag); } } function handleRequestStart(ctx: EnvoyContext, messageId: protocol.MessageId, req: protocol.ToEnvoyRequestStart) { - const actor = findActor(ctx, req.actorId); + const actor = getActor(ctx, req.actorId); if (!actor) { log(ctx.shared)?.warn({ @@ -47,7 +58,7 @@ function handleRequestStart(ctx: EnvoyContext, messageId: protocol.MessageId, re function handleRequestChunk(ctx: EnvoyContext, messageId: protocol.MessageId, chunk: protocol.ToEnvoyRequestChunk) { const actorId = ctx.requestToActor.get([messageId.gatewayId, messageId.requestId]); if (actorId) { - let actor = findActor(ctx, actorId); + let actor = getActor(ctx, actorId); if (actor) { actor.handle.send({ type: "request-chunk", messageId, chunk }); } @@ -61,7 +72,7 @@ function handleRequestChunk(ctx: EnvoyContext, messageId: protocol.MessageId, ch function handleRequestAbort(ctx: EnvoyContext, messageId: protocol.MessageId) { const actorId = ctx.requestToActor.get([messageId.gatewayId, messageId.requestId]); if (actorId) { - let actor = findActor(ctx, actorId); + let actor = getActor(ctx, actorId); if (actor) { actor.handle.send({ type: "request-abort", messageId }); } diff --git a/engine/sdks/typescript/envoy-client/src/utils.ts b/engine/sdks/typescript/envoy-client/src/utils.ts index 7255ae4089..1cb4965a86 100644 --- a/engine/sdks/typescript/envoy-client/src/utils.ts +++ b/engine/sdks/typescript/envoy-client/src/utils.ts @@ -1,8 +1,5 @@ import { logger } from "./log"; -// 20MiB. Keep in sync with runner_max_response_payload_body_size from engine/packages/config/src/config/pegboard.rs -export const MAX_PAYLOAD_SIZE = 20 * 1024 * 1024; - /** Resolves after the configured debug latency, or immediately if none. */ export function injectLatency(ms?: number): Promise { if (!ms) return Promise.resolve(); diff --git a/engine/sdks/typescript/envoy-client/src/websocket.ts b/engine/sdks/typescript/envoy-client/src/websocket.ts index 2470fb24e6..cd74066fda 100644 --- a/engine/sdks/typescript/envoy-client/src/websocket.ts +++ b/engine/sdks/typescript/envoy-client/src/websocket.ts @@ -4,6 +4,12 @@ import { spawn } from "antiox/task"; import type WsWebSocket from "ws"; import { latencyChannel } from "./latency-channel.js"; import { logger } from "./log.js"; +import { VirtualWebSocket, type UniversalWebSocket, type RivetMessageEvent } from "@rivetkit/virtual-websocket"; +import { wrappingAddU16, wrappingLteU16, wrappingSubU16 } from "./utils"; +import { SharedContext } from "./context.js"; +import { log } from "./tasks/envoy/index.js"; + +export const HIBERNATABLE_SYMBOL = Symbol("hibernatable"); export type WebSocketTxData = Parameters[0]; @@ -128,3 +134,205 @@ export async function webSocket( return [outboundTx, inboundRx]; } + +export class WebSocketTunnelAdapter { + #readyState: 0 | 1 | 2 | 3 = 0; + #binaryType: "nodebuffer" | "arraybuffer" | "blob" = "nodebuffer"; + #shared: SharedContext; + #ws: VirtualWebSocket; + #actorId: string; + #requestId: string; + #hibernatable: boolean; + #messageIndex: number; + #sendCallback: (data: ArrayBuffer | string, isBinary: boolean) => void; + #closeCallback: (code?: number, reason?: string) => void; + + get [HIBERNATABLE_SYMBOL](): boolean { + return this.#hibernatable; + } + + constructor( + ctx: SharedContext, + actorId: string, + requestId: string, + messageIndex: number, + hibernatable: boolean, + isRestoringHibernatable: boolean, + public readonly request: Request, + sendCallback: (data: ArrayBuffer | string, isBinary: boolean) => void, + closeCallback: (code?: number, reason?: string) => void, + ) { + this.#shared = ctx; + this.#actorId = actorId; + this.#requestId = requestId; + this.#hibernatable = hibernatable; + this.#messageIndex = messageIndex; + this.#sendCallback = sendCallback; + this.#closeCallback = closeCallback; + + this.#ws = new VirtualWebSocket({ + getReadyState: () => this.#readyState, + onSend: (data) => this.#handleSend(data), + onClose: (code, reason) => this.#close(code, reason, true), + onTerminate: () => this.#terminate(), + }); + + if (isRestoringHibernatable) { + log(this.#shared)?.debug({ + msg: "setting WebSocket to OPEN state for restored connection", + actorId: this.#actorId, + requestId: this.#requestId, + }); + this.#readyState = 1; + } + } + + get websocket(): UniversalWebSocket { + return this.#ws; + } + + #handleSend(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { + let isBinary = false; + let messageData: string | ArrayBuffer; + + const maxPayloadSize = this.#shared.protocolMetadata?.maxResponsePayloadSize ?? Infinity; + + if (typeof data === "string") { + const encoder = new TextEncoder(); + if (encoder.encode(data).byteLength > maxPayloadSize) { + throw new Error("WebSocket message too large"); + } + + messageData = data; + } else if (data instanceof ArrayBuffer) { + if (data.byteLength > maxPayloadSize) throw new Error("WebSocket message too large"); + + isBinary = true; + messageData = data; + } else if (ArrayBuffer.isView(data)) { + if (data.byteLength > maxPayloadSize) throw new Error("WebSocket message too large"); + + isBinary = true; + const view = data; + const buffer = view.buffer instanceof SharedArrayBuffer + ? new Uint8Array(view.buffer, view.byteOffset, view.byteLength).slice().buffer + : view.buffer.slice(view.byteOffset, view.byteOffset + view.byteLength); + messageData = buffer as ArrayBuffer; + } else { + throw new Error("Unsupported data type"); + } + + this.#sendCallback(messageData, isBinary); + } + + // Called by Tunnel when WebSocket is opened + _handleOpen(requestId: ArrayBuffer): void { + if (this.#readyState !== 0) return; + this.#readyState = 1; + this.#ws.dispatchEvent({ type: "open", rivetRequestId: requestId, target: this.#ws }); + } + + // Called by Tunnel when message is received + _handleMessage( + requestId: ArrayBuffer, + data: string | Uint8Array, + messageIndex: number, + isBinary: boolean, + ): boolean { + if (this.#readyState !== 1) { + log(this.#shared)?.warn({ + msg: "WebSocket message ignored - not in OPEN state", + requestId: this.#requestId, + actorId: this.#actorId, + currentReadyState: this.#readyState, + }); + return true; + } + + // Validate message index for hibernatable websockets + if (this.#hibernatable) { + const previousIndex = this.#messageIndex; + + if (wrappingLteU16(messageIndex, previousIndex)) { + log(this.#shared)?.info({ + msg: "received duplicate hibernating websocket message", + requestId, + actorId: this.#actorId, + previousIndex, + receivedIndex: messageIndex, + }); + return true; + } + + const expectedIndex = wrappingAddU16(previousIndex, 1); + if (messageIndex !== expectedIndex) { + const closeReason = "ws.message_index_skip"; + log(this.#shared)?.warn({ + msg: "hibernatable websocket message index out of sequence, closing connection", + requestId, + actorId: this.#actorId, + previousIndex, + expectedIndex, + receivedIndex: messageIndex, + closeReason, + gap: wrappingSubU16(wrappingSubU16(messageIndex, previousIndex), 1), + }); + this.#close(1008, closeReason, true); + return true; + } + + this.#messageIndex = messageIndex; + } + + // Convert data based on binaryType + let messageData: any = data; + if (isBinary && data instanceof Uint8Array) { + if (this.#binaryType === "nodebuffer") { + messageData = Buffer.from(data); + } else if (this.#binaryType === "arraybuffer") { + messageData = data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength); + } + } + + this.#ws.dispatchEvent({ + type: "message", + data: messageData, + rivetRequestId: requestId, + rivetMessageIndex: messageIndex, + target: this.#ws, + } as RivetMessageEvent); + + return false; + } + + // Called by Tunnel when close is received + _handleClose(_requestId: ArrayBuffer, code?: number, reason?: string): void { + this.#close(code, reason, true); + } + + // Close without sending close message to tunnel + _closeWithoutCallback(code?: number, reason?: string): void { + this.#close(code, reason, false); + } + + // Public close method (used by tunnel.ts for stale websocket cleanup) + close(code?: number, reason?: string): void { + this.#close(code, reason, true); + } + + #close(code: number | undefined, reason: string | undefined, sendCallback: boolean): void { + if (this.#readyState >= 2) return; + + this.#readyState = 2; + if (sendCallback) this.#closeCallback(code, reason); + this.#readyState = 3; + this.#ws.triggerClose(code ?? 1000, reason ?? ""); + } + + #terminate(): void { + // Immediate close without close frame + this.#readyState = 3; + this.#closeCallback(1006, "Abnormal Closure"); + this.#ws.triggerClose(1006, "Abnormal Closure", false); + } +} diff --git a/engine/sdks/typescript/test-envoy/src/index.ts b/engine/sdks/typescript/test-envoy/src/index.ts index e5d5a6a2ad..7edbb3edcd 100644 --- a/engine/sdks/typescript/test-envoy/src/index.ts +++ b/engine/sdks/typescript/test-envoy/src/index.ts @@ -55,7 +55,7 @@ app.get("/health", (c) => { }); app.get("/shutdown", async (c) => { - await envoy?.shutdown(); + envoy?.shutdown(false); return c.text("ok"); }); @@ -160,6 +160,7 @@ if (AUTOSTART_ENVOY) { `Actor ${_actorId} stopped (generation ${_generation})`, ); }, + onShutdown() { }, // TODO: websocket: async ( envoy: EnvoyHandle, diff --git a/examples/hono/package.json b/examples/hono/package.json index 83ef26c0db..7b590878ac 100644 --- a/examples/hono/package.json +++ b/examples/hono/package.json @@ -5,8 +5,10 @@ "type": "module", "scripts": { "dev": "tsx --watch src/index.ts", + "dev:server": "tsx --watch src/server.ts", "check-types": "tsc --noEmit", - "start": "tsx src/index.ts" + "start": "tsx src/index.ts", + "start:server": "tsx src/server.ts" }, "devDependencies": { "@types/node": "^22.13.9", @@ -14,6 +16,7 @@ "typescript": "^5.5.2" }, "dependencies": { + "@hono/node-server": "^1.19.12", "hono": "^4.7.0", "rivetkit": "^2.2.1" }, diff --git a/examples/hono/src/server.ts b/examples/hono/src/server.ts index eb6d6a7a8f..7406f9ba51 100644 --- a/examples/hono/src/server.ts +++ b/examples/hono/src/server.ts @@ -18,3 +18,12 @@ app.post("/increment/:name", async (c) => { }); export default app; + +// Start server when run directly +if (import.meta.url === `file://${process.argv[1]}`) { + const { serve } = await import("@hono/node-server"); + const port = 3000; + serve({ fetch: app.fetch, port }, () => + console.log(`Server running at http://localhost:${port}`), + ); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c3061c41e1..19f88ac16b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1509,6 +1509,9 @@ importers: examples/hono: dependencies: + '@hono/node-server': + specifier: ^1.19.12 + version: 1.19.12(hono@4.11.9) hono: specifier: ^4.7.0 version: 4.11.9 @@ -4141,6 +4144,9 @@ importers: '@rivetkit/bare-ts': specifier: ^0.6.2 version: 0.6.2 + '@rivetkit/engine-envoy-client': + specifier: workspace:* + version: link:../../../engine/sdks/typescript/envoy-client '@rivetkit/engine-runner': specifier: workspace:* version: link:../../../engine/sdks/typescript/runner @@ -7261,6 +7267,12 @@ packages: peerDependencies: hono: ^4 + '@hono/node-server@1.19.12': + resolution: {integrity: sha512-txsUW4SQ1iilgE0l9/e9VQWmELXifEFvmdA1j6WFh/aFPj99hIntrSsq/if0UWyGVkmrRPKA1wCeP+UCr1B9Uw==} + engines: {node: '>=18.14.1'} + peerDependencies: + hono: ^4 + '@hono/node-server@1.19.9': resolution: {integrity: sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==} engines: {node: '>=18.14.1'} @@ -22209,6 +22221,10 @@ snapshots: dependencies: hono: 4.11.9 + '@hono/node-server@1.19.12(hono@4.11.9)': + dependencies: + hono: 4.11.9 + '@hono/node-server@1.19.9(hono@4.11.9)': dependencies: hono: 4.11.9 diff --git a/rivetkit-asyncapi/asyncapi.json b/rivetkit-asyncapi/asyncapi.json index c4b346769d..96c8ee0784 100644 --- a/rivetkit-asyncapi/asyncapi.json +++ b/rivetkit-asyncapi/asyncapi.json @@ -2,7 +2,7 @@ "asyncapi": "3.0.0", "info": { "title": "RivetKit WebSocket Protocol", - "version": "2.2.1", + "version": "2.1.11-rc.1", "description": "WebSocket protocol for bidirectional communication between RivetKit clients and actors" }, "channels": { diff --git a/rivetkit-openapi/openapi.json b/rivetkit-openapi/openapi.json index 7615ed98da..f1e1ec418d 100644 --- a/rivetkit-openapi/openapi.json +++ b/rivetkit-openapi/openapi.json @@ -1,7 +1,7 @@ { "openapi": "3.0.0", "info": { - "version": "2.2.1", + "version": "2.1.11-rc.1", "title": "RivetKit API" }, "components": { diff --git a/rivetkit-typescript/packages/next-js/src/mod.ts b/rivetkit-typescript/packages/next-js/src/mod.ts index d1e0cc91de..fbf568fcf0 100644 --- a/rivetkit-typescript/packages/next-js/src/mod.ts +++ b/rivetkit-typescript/packages/next-js/src/mod.ts @@ -33,18 +33,20 @@ export const toNextHandler = (registry: Registry) => { // Set these on the registry's config directly since the legacy inputConfig // isn't used by the serverless router registry.config.serverless.spawnEngine = true; - registry.config.serverless.configureRunnerPool = { + registry.config.serverless.configurePool = { url: `${publicUrl}/api/rivet`, + requestLifespan: 300, + maxConcurrentActors: 100_000, + metadata: { provider: "next-js" }, + minRunners: 0, maxRunners: 100_000, - requestLifespan: 300, slotsPerRunner: 1, - metadata: { provider: "next-js" }, }; // Set runner version to enable hot-reloading on code changes - registry.config.runner = { - ...registry.config.runner, + registry.config.envoy = { + ...registry.config.envoy, version: DEV_RUNNER_VERSION, }; } else { diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index a2815bf05f..8a89adc2be 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -335,6 +335,7 @@ "@hono/standard-validator": "^0.1.3", "@hono/zod-openapi": "^1.1.5", "@rivetkit/bare-ts": "^0.6.2", + "@rivetkit/engine-envoy-client": "workspace:*", "@rivetkit/engine-runner": "workspace:*", "@rivetkit/fast-json-patch": "^3.1.2", "@rivetkit/on-change": "^6.0.2-rc.1", @@ -429,4 +430,4 @@ } }, "stableVersion": "0.8.0" -} +} \ No newline at end of file diff --git a/rivetkit-typescript/packages/rivetkit/runtime/index.ts b/rivetkit-typescript/packages/rivetkit/runtime/index.ts index 7f6f21d58e..b0f43dabb9 100644 --- a/rivetkit-typescript/packages/rivetkit/runtime/index.ts +++ b/rivetkit-typescript/packages/rivetkit/runtime/index.ts @@ -5,7 +5,7 @@ import { chooseDefaultDriver } from "@/drivers/default"; import { ENGINE_PORT, ensureEngineProcess } from "@/engine-process/mod"; import { getInspectorUrl } from "@/inspector/utils"; import { buildManagerRouter } from "@/manager/router"; -import { configureServerlessRunner } from "@/serverless/configure"; +import { configureServerlessPool } from "@/serverless/configure"; import { detectRuntime, type GetUpgradeWebSocket } from "@/utils"; import pkg from "../package.json" with { type: "json" }; import { @@ -24,8 +24,8 @@ import { buildServerlessRouter } from "@/serverless/router"; import type { Registry } from "@/registry"; import { getNodeFsSync } from "@/utils/node"; -/** Tracks whether the runtime was started as serverless or runner. */ -export type StartKind = "serverless" | "runner"; +/** Tracks whether the runtime was started as serverless or serverful. */ +export type StartKind = "serverless" | "serverful"; function logLine(label: string, value: string): void { const padding = " ".repeat(Math.max(0, 13 - label.length)); @@ -38,9 +38,9 @@ function logLine(label: string, value: string): void { * Startup happens in two phases: * 1. `Runtime.create()` initializes shared infrastructure like the manager * server and engine process. This runs before we know the deployment mode. - * 2. `startServerless()` or `startRunner()` configures mode-specific behavior. + * 2. `startServerless()` or `startEnvoy()` configures mode-specific behavior. * These are idempotent and called lazily when the first request arrives - * or when explicitly starting a runner. + * or when explicitly starting a envoy. */ export class Runtime { #registry: Registry; @@ -110,7 +110,7 @@ export class Runtime { // 6420. This is a fallback for platforms that cannot use the manager // like Next.js. // - // We do this before startServerless or startRunner has been called + // We do this before startServerless or startEnvoy has been called // since the engine API needs to be available on port 6420 before // anything else happens. For example, serverless platforms use // `registry.handler(req)` so `startServerless` is called lazily. @@ -254,18 +254,18 @@ export class Runtime { this.#printWelcome(); - if (this.#config.serverless.configureRunnerPool) { + if (this.#config.serverless.configurePool) { // biome-ignore lint/nursery/noFloatingPromises: intentional - configureServerlessRunner(this.#config); + configureServerlessPool(this.#config); } } - startRunner(): void { - if (this.#startKind === "runner") return; + startEnvoy(): void { + if (this.#startKind === "serverful") return; invariant(!this.#startKind, "Runtime already started as serverless"); - this.#startKind = "runner"; + this.#startKind = "serverful"; - if (this.#config.runner && this.#driver.autoStartActorDriver) { + if (this.#config.envoy && this.#driver.autoStartActorDriver) { logger().debug("starting actor driver"); const inlineClient = createClientWithDriver>( this.#managerDriver, @@ -285,7 +285,7 @@ export class Runtime { console.log(); console.log( - ` RivetKit ${pkg.version} (${this.#driver.displayName} - ${this.#startKind === "serverless" ? "Serverless" : "Runner"})`, + ` RivetKit ${pkg.version} (${this.#driver.displayName} - ${this.#startKind === "serverless" ? "Serverless" : "Serverful"})`, ); // Show namespace diff --git a/rivetkit-typescript/packages/rivetkit/src/client/config.ts b/rivetkit-typescript/packages/rivetkit/src/client/config.ts index 8ee00a0b22..c0d2b3fa1f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/config.ts @@ -6,7 +6,7 @@ import { getRivetEndpoint, getRivetToken, getRivetNamespace, - getRivetRunner, + getRivetPool, } from "@/utils/env-vars"; import type { RegistryConfig } from "@/registry/config"; import { tryParseEndpoint } from "@/utils/endpoint-parser"; @@ -39,9 +39,9 @@ export const ClientConfigSchemaBase = z.object({ hasWarnedMissingEndpoint = true; console.warn( `[rivetkit] No endpoint provided to client. Defaulting to ${DEFAULT_ENDPOINT}. ` + - `Starting in 2.2.0, an explicit endpoint will be required. ` + - `Pass an endpoint to createClient() or createRivetKit(), ` + - `or set the RIVET_ENDPOINT environment variable.`, + `Starting in 2.2.0, an explicit endpoint will be required. ` + + `Pass an endpoint to createClient() or createRivetKit(), ` + + `or set the RIVET_ENDPOINT environment variable.`, ); } return resolved ?? DEFAULT_ENDPOINT; @@ -59,8 +59,8 @@ export const ClientConfigSchemaBase = z.object({ .optional() .transform((val) => val ?? getRivetNamespace()), - /** Name of the runner. This is used to group together runners in to different pools. */ - runnerName: z.string().default(() => getRivetRunner() ?? "default"), + /** Name of the envoy pool. This is used to group together envoys in to different pools. */ + poolName: z.string().default(() => getRivetPool() ?? "default"), encoding: EncodingSchema.default("bare"), @@ -133,7 +133,7 @@ export function convertRegistryConfigToClientConfig( endpoint: config.endpoint, token: config.token, namespace: config.namespace, - runnerName: config.runner.runnerName, + poolName: config.envoy.poolName, headers: config.headers, encoding: "bare", getUpgradeWebSocket: undefined, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/utils.ts b/rivetkit-typescript/packages/rivetkit/src/client/utils.ts index c4d61f02e4..52e501ba8a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/utils.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/utils.ts @@ -90,8 +90,8 @@ export interface HttpRequestOpts< requestVersionedDataHandler: VersionedDataHandler | undefined; requestVersion: number | undefined; responseVersionedDataHandler: - | VersionedDataHandler - | undefined; + | VersionedDataHandler + | undefined; responseVersion: number | undefined; requestZodSchema: z.ZodType; responseZodSchema: z.ZodType; @@ -152,8 +152,8 @@ export async function sendHttpRequest< ...opts.headers, ...(contentType ? { - "Content-Type": contentType, - } + "Content-Type": contentType, + } : {}), "User-Agent": httpUserAgent(), }, diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 409c583ecb..04584677eb 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -1,11 +1,7 @@ -import type { - ActorConfig as EngineActorConfig, - RunnerConfig as EngineRunnerConfig, - HibernatingWebSocketMetadata, -} from "@rivetkit/engine-runner"; +import type { EnvoyConfig } from "@rivetkit/engine-envoy-client"; import type { ISqliteVfs } from "@rivetkit/sqlite-vfs"; import { SqliteVfsPoolManager } from "@/driver-helpers/sqlite-pool"; -import { idToStr, Runner } from "@rivetkit/engine-runner"; +import { protocol, utils, EnvoyHandle, startEnvoySync } from "@rivetkit/engine-envoy-client"; import * as cbor from "cbor-x"; import type { Context as HonoContext } from "hono"; import { streamSSE } from "hono/streaming"; @@ -62,8 +58,8 @@ import { } from "@/utils"; import { logger } from "./log"; -const RUNNER_SSE_PING_INTERVAL = 1000; -const RUNNER_STOP_WAIT_MS = 15_000; +const ENVOY_SSE_PING_INTERVAL = 1000; +const ENVOY_STOP_WAIT_MS = 15_000; // Message ack deadline is 30s on the gateway, but we will ack more frequently // in order to minimize the message buffer size on the gateway and to give @@ -92,26 +88,26 @@ export class EngineActorDriver implements ActorDriver { #config: RegistryConfig; #managerDriver: ManagerDriver; #inlineClient: Client; - #runner: Runner; + #envoy: EnvoyHandle; #actors: Map = new Map(); #actorRouter: ActorRouter; #sqlitePool: SqliteVfsPoolManager; - #runnerStarted: PromiseWithResolvers = promiseWithResolvers( + #envoyStarted: PromiseWithResolvers = promiseWithResolvers( (reason) => logger().warn({ - msg: "unhandled runner started promise rejection", + msg: "unhandled envoy started promise rejection", reason, }), ); - #runnerStopped: PromiseWithResolvers = promiseWithResolvers( + #envoyStopped: PromiseWithResolvers = promiseWithResolvers( (reason) => logger().warn({ - msg: "unhandled runner stopped promise rejection", + msg: "unhandled envoy stopped promise rejection", reason, }), ); - #isRunnerStopped: boolean = false; + #isEnvoyStopped: boolean = false; // HACK: Track actor stop intent locally since the runner protocol doesn't // pass the stop reason to onActorStop. This will be fixed when the runner @@ -165,52 +161,53 @@ export class EngineActorDriver implements ActorDriver { config.test.enabled, ); - // Create runner configuration - const engineRunnerConfig: EngineRunnerConfig = { - version: config.runner.version, + // Create configuration + const envoyConfig: EnvoyConfig = { + version: config.envoy.version, endpoint: getEndpoint(config), token, namespace: config.namespace, - totalSlots: config.runner.totalSlots, - runnerName: config.runner.runnerName, + poolName: config.envoy.poolName, metadata: { rivetkit: { version: VERSION }, }, prepopulateActorNames: buildActorNames(config), - onConnected: () => { - this.#runnerStarted.resolve(undefined); - }, - onDisconnected: (_code, _reason) => {}, onShutdown: () => { - this.#runnerStopped.resolve(undefined); - this.#isRunnerStopped = true; + this.#envoyStopped.resolve(undefined); + this.#isEnvoyStopped = true; }, - fetch: this.#runnerFetch.bind(this), - websocket: this.#runnerWebSocket.bind(this), + fetch: this.#envoyFetch.bind(this), + websocket: this.#envoyWebSocket.bind(this), hibernatableWebSocket: { canHibernate: this.#hwsCanHibernate.bind(this), }, - onActorStart: this.#runnerOnActorStart.bind(this), - onActorStop: this.#runnerOnActorStop.bind(this), - logger: getLogger("engine-runner"), + onActorStart: this.#envoyOnActorStart.bind(this), + onActorStop: this.#envoyOnActorStop.bind(this), + logger: getLogger("envoy-client"), debugLatencyMs: process.env._RIVET_DEBUG_LATENCY_MS ? Number.parseInt(process.env._RIVET_DEBUG_LATENCY_MS, 10) : undefined, }; - // Create and start runner - this.#runner = new Runner(engineRunnerConfig); - this.#runner.start(); + // Create and start envoy + const [envoy, startRx] = startEnvoySync(envoyConfig); + + this.#envoy = envoy; + + startRx.changed().then(() => { + this.#envoyStarted.resolve(undefined); + }); + logger().debug({ - msg: "engine runner started", + msg: "envoy client started", endpoint: config.endpoint, namespace: config.namespace, - runnerName: config.runner.runnerName, + poolName: config.envoy.poolName, }); } getExtraActorLogParams(): Record { - return { runnerId: this.#runner.runnerId ?? "-" }; + return { envoyKey: this.#envoy.getEnvoyKey() ?? "-" }; } async #loadActorHandler(actorId: string): Promise { @@ -268,7 +265,7 @@ export class EngineActorDriver implements ActorDriver { // // onAlarm is automatically called on `ActorInstance.start` when waking // again. - this.#runner.setAlarm(actor.id, timestamp); + this.#envoy.setAlarm(actor.id, timestamp); } // No database overrides - will use KV-backed implementation from rivetkit/db @@ -278,18 +275,18 @@ export class EngineActorDriver implements ActorDriver { actorId: string, entries: [Uint8Array, Uint8Array][], ): Promise { - await this.#runner.kvPut(actorId, entries); + await this.#envoy.kvPut(actorId, entries); } async kvBatchGet( actorId: string, keys: Uint8Array[], ): Promise<(Uint8Array | null)[]> { - return await this.#runner.kvGet(actorId, keys); + return await this.#envoy.kvGet(actorId, keys); } async kvBatchDelete(actorId: string, keys: Uint8Array[]): Promise { - await this.#runner.kvDelete(actorId, keys); + await this.#envoy.kvDelete(actorId, keys); } async kvDeleteRange( @@ -297,11 +294,11 @@ export class EngineActorDriver implements ActorDriver { start: Uint8Array, end: Uint8Array, ): Promise { - await this.#runner.kvDeleteRange(actorId, start, end); + await this.#envoy.kvDeleteRange(actorId, start, end); } async kvList(actorId: string): Promise { - const entries = await this.#runner.kvListPrefix( + const entries = await this.#envoy.kvListPrefix( actorId, new Uint8Array(), ); @@ -323,7 +320,7 @@ export class EngineActorDriver implements ActorDriver { limit?: number; }, ): Promise<[Uint8Array, Uint8Array][]> { - const result = await this.#runner.kvListPrefix( + const result = await this.#envoy.kvListPrefix( actorId, prefix, options, @@ -347,7 +344,7 @@ export class EngineActorDriver implements ActorDriver { limit?: number; }, ): Promise<[Uint8Array, Uint8Array][]> { - return await this.#runner.kvListRange( + return await this.#envoy.kvListRange( actorId, start, end, @@ -371,16 +368,16 @@ export class EngineActorDriver implements ActorDriver { startSleep(actorId: string) { // HACK: Track intent for onActorStop (see RVT-5284) this.#actorStopIntent.set(actorId, "sleep"); - this.#runner.sleepActor(actorId); + this.#envoy.sleepActor(actorId); } startDestroy(actorId: string) { // HACK: Track intent for onActorStop (see RVT-5284) this.#actorStopIntent.set(actorId, "destroy"); - this.#runner.destroyActor(actorId); + this.#envoy.destroyActor(actorId); } - async shutdownRunner(immediate: boolean): Promise { + async shutdown(immediate: boolean): Promise { logger().info({ msg: "stopping engine actor driver", immediate }); // TODO: We need to update the runner to have a draining state so: @@ -431,7 +428,7 @@ export class EngineActorDriver implements ActorDriver { await this.#sqlitePool.shutdown(); try { - await this.#runner.shutdown(immediate); + await this.#envoy.shutdown(immediate); } catch (error) { const message = error instanceof Error ? error.message : String(error); @@ -448,63 +445,64 @@ export class EngineActorDriver implements ActorDriver { } const stopped = await Promise.race([ - this.#runnerStopped.promise.then(() => true), + this.#envoyStopped.promise.then(() => true), new Promise((resolve) => - setTimeout(() => resolve(false), RUNNER_STOP_WAIT_MS), + setTimeout(() => resolve(false), ENVOY_STOP_WAIT_MS), ), ]); if (!stopped) { logger().warn({ - msg: "timed out waiting for runner shutdown", - waitMs: RUNNER_STOP_WAIT_MS, + msg: "timed out waiting for envoy shutdown", + waitMs: ENVOY_STOP_WAIT_MS, }); } } async serverlessHandleStart(c: HonoContext): Promise { return streamSSE(c, async (stream) => { - // NOTE: onAbort does not work reliably - stream.onAbort(() => {}); - c.req.raw.signal.addEventListener("abort", () => { - logger().debug("SSE aborted, shutting down runner"); - - // We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason. - // - // If we did not use a graceful shutdown, the runner would - this.shutdownRunner(false); - }); - - await this.#runnerStarted.promise; - - // Runner id should be set if the runner started - const payload = this.#runner.getServerlessInitPacket(); - invariant(payload, "runnerId not set"); - await stream.writeSSE({ data: payload }); - - // Send ping every second to keep the connection alive - while (true) { - if (this.#isRunnerStopped) { - logger().debug({ - msg: "runner is stopped", - }); - break; - } - - if (stream.closed || stream.aborted) { - logger().debug({ - msg: "runner sse stream closed", - closed: stream.closed, - aborted: stream.aborted, - }); - break; - } - - await stream.writeSSE({ event: "ping", data: "" }); - await stream.sleep(RUNNER_SSE_PING_INTERVAL); - } + // TODO: + // // NOTE: onAbort does not work reliably + // stream.onAbort(() => { }); + // c.req.raw.signal.addEventListener("abort", () => { + // logger().debug("SSE aborted, shutting down runner"); + + // // We cannot assume that the request will always be closed gracefully by Rivet. We always proceed with a graceful shutdown in case the request was terminated for any other reason. + // // + // // If we did not use a graceful shutdown, the runner would + // this.shutdownRunner(false); + // }); + + // await this.#envoyStarted.promise; + + // // Runner id should be set if the runner started + // const payload = this.#envoy.getServerlessInitPacket(); + // invariant(payload, "runnerId not set"); + // await stream.writeSSE({ data: payload }); + + // // Send ping every second to keep the connection alive + // while (true) { + // if (this.#isRunnerStopped) { + // logger().debug({ + // msg: "runner is stopped", + // }); + // break; + // } + + // if (stream.closed || stream.aborted) { + // logger().debug({ + // msg: "runner sse stream closed", + // closed: stream.closed, + // aborted: stream.aborted, + // }); + // break; + // } + + // await stream.writeSSE({ event: "ping", data: "" }); + // await stream.sleep(RUNNER_SSE_PING_INTERVAL); + // } // Wait for the runner to stop if the SSE stream aborted early for any reason - await this.#runnerStopped.promise; + await this.#envoyStopped.promise; }); } @@ -526,9 +524,9 @@ export class EngineActorDriver implements ActorDriver { ]; const [exactResults, ...prefixResults] = await Promise.all([ - this.#runner.kvGet(actorId, remainingExactKeys), + this.#envoy.kvGet(actorId, remainingExactKeys), ...prefixScans.map((prefix) => - this.#runner.kvListPrefix(actorId, prefix), + this.#envoy.kvListPrefix(actorId, prefix), ), ]); @@ -561,13 +559,14 @@ export class EngineActorDriver implements ActorDriver { return { preloadMap, entries: entries.length }; } - async #runnerOnActorStart( + async #envoyOnActorStart( + _envoy: EnvoyHandle, actorId: string, generation: number, - actorConfig: EngineActorConfig, + actorConfig: protocol.ActorConfig, ): Promise { logger().debug({ - msg: "runner actor starting", + msg: "engine actor starting", actorId, name: actorConfig.name, key: actorConfig.key, @@ -577,7 +576,7 @@ export class EngineActorDriver implements ActorDriver { // Deserialize input let input: any; if (actorConfig.input) { - input = cbor.decode(actorConfig.input); + input = cbor.decode(new Uint8Array(actorConfig.input)); } // Get or create handler @@ -605,7 +604,7 @@ export class EngineActorDriver implements ActorDriver { try { // Check if this actor already has persisted state. let checkStart = performance.now(); - const [persistDataBuffer] = await this.#runner.kvGet(actorId, [ + const [persistDataBuffer] = await this.#envoy.kvGet(actorId, [ KEYS.PERSIST_DATA, ]); const checkPersistDataMs = performance.now() - checkStart; @@ -621,7 +620,7 @@ export class EngineActorDriver implements ActorDriver { if (persistDataBuffer === null) { const initStart = performance.now(); const initialKvState = getInitialActorKvState(input); - await this.#runner.kvPut(actorId, initialKvState); + await this.#envoy.kvPut(actorId, initialKvState); initNewActorMs = performance.now() - initStart; driverKvRoundTrips++; logger().debug({ @@ -664,7 +663,7 @@ export class EngineActorDriver implements ActorDriver { handler.actor.metrics.startup.kvRoundTrips = driverKvRoundTrips; // Apply protocol limits as per-instance overrides without mutating the shared definition - const protocolMetadata = this.#runner.getProtocolMetadata(); + const protocolMetadata = this.#envoy.getProtocolMetadata(); if (protocolMetadata) { logger().debug({ msg: "applying config limits from protocol", @@ -681,7 +680,7 @@ export class EngineActorDriver implements ActorDriver { if (protocolMetadata.serverlessDrainGracePeriod) { const drainMax = Math.max( Number(protocolMetadata.serverlessDrainGracePeriod) - - 1000, + 1000, 0, ); handler.actor.overrides.runStopTimeout = drainMax; @@ -702,23 +701,23 @@ export class EngineActorDriver implements ActorDriver { preloadMap, ); - logger().debug({ msg: "runner actor started", actorId, name, key }); + logger().debug({ msg: "engine actor started", actorId, name, key }); } catch (innerError) { const error = innerError instanceof Error ? new Error( - `Failed to start actor ${actorId}: ${innerError.message}`, - { cause: innerError }, - ) + `Failed to start actor ${actorId}: ${innerError.message}`, + { cause: innerError }, + ) : new Error( - `Failed to start actor ${actorId}: ${String(innerError)}`, - ); + `Failed to start actor ${actorId}: ${String(innerError)}`, + ); handler.actor = undefined; handler.actorStartError = error; handler.actorStartPromise?.reject(error); handler.actorStartPromise = undefined; logger().error({ - msg: "runner actor failed to start", + msg: "engine actor failed to start", actorId, name, key, @@ -726,7 +725,7 @@ export class EngineActorDriver implements ActorDriver { }); try { - this.#runner.stopActor(actorId); + this.#envoy.stopActor(actorId); } catch (stopError) { logger().debug({ msg: "failed to stop actor after start failure", @@ -737,11 +736,13 @@ export class EngineActorDriver implements ActorDriver { } } - async #runnerOnActorStop( + async #envoyOnActorStop( + _envoyHandle: EnvoyHandle, actorId: string, generation: number, + _reason: protocol.StopActorReason, ): Promise { - logger().debug({ msg: "runner actor stopping", actorId, generation }); + logger().debug({ msg: "engine actor stopping", actorId, generation }); // HACK: Retrieve the stop intent we tracked locally (see RVT-5284) // Default to "sleep" if no intent was recorded (e.g., if the runner @@ -756,7 +757,7 @@ export class EngineActorDriver implements ActorDriver { const handler = this.#actors.get(actorId); if (!handler) { logger().debug({ - msg: "no runner actor handler to stop", + msg: "no engine actor handler to stop", actorId, reason, }); @@ -766,7 +767,7 @@ export class EngineActorDriver implements ActorDriver { if (handler.actorStartPromise) { try { logger().debug({ - msg: "runner actor stopping before it started, waiting", + msg: "engine actor stopping before it started, waiting", actorId, generation, }); @@ -794,19 +795,19 @@ export class EngineActorDriver implements ActorDriver { this.#actors.delete(actorId); - logger().debug({ msg: "runner actor stopped", actorId, reason }); + logger().debug({ msg: "engine actor stopped", actorId, reason }); } - // MARK: - Runner Networking - async #runnerFetch( - _runner: Runner, + // MARK: - Envoy Networking + async #envoyFetch( + _envoy: EnvoyHandle, actorId: string, _gatewayIdBuf: ArrayBuffer, _requestIdBuf: ArrayBuffer, request: Request, ): Promise { logger().debug({ - msg: "runner fetch", + msg: "envoy fetch", actorId, url: request.url, method: request.method, @@ -814,8 +815,8 @@ export class EngineActorDriver implements ActorDriver { return await this.#actorRouter.fetch(request, { actorId }); } - async #runnerWebSocket( - _runner: Runner, + async #envoyWebSocket( + _envoy: EnvoyHandle, actorId: string, websocketRaw: any, gatewayIdBuf: ArrayBuffer, @@ -833,7 +834,7 @@ export class EngineActorDriver implements ActorDriver { (websocket as any).__rivet_ws_id = wsUniqueId; logger().debug({ - msg: "runner websocket", + msg: "envoy websocket", actorId, url: request.url, isRestoringHibernatable, @@ -1026,8 +1027,8 @@ export class EngineActorDriver implements ActorDriver { msg: "event listeners attached to restored websocket", actorId, connId: conn?.id, - gatewayId: idToStr(gatewayIdBuf), - requestId: idToStr(requestIdBuf), + gatewayId: utils.idToStr(gatewayIdBuf), + requestId: utils.idToStr(requestIdBuf), websocketType: websocket?.constructor?.name, hasMessageListener: !!websocket.addEventListener, }); @@ -1044,8 +1045,8 @@ export class EngineActorDriver implements ActorDriver { const url = new URL(request.url); const path = url.pathname; - // Get actor instance from runner to access actor name - const actorInstance = this.#runner.getActor(actorId); + // Get actor instance from envoy to access actor name + const actorInstance = this.#envoy.getActor(actorId); if (!actorInstance) { logger().warn({ msg: "actor not found in #hwsCanHibernate", @@ -1074,8 +1075,8 @@ export class EngineActorDriver implements ActorDriver { // Determine configuration for new WS logger().debug({ msg: "no existing hibernatable websocket found", - gatewayId: idToStr(gatewayId), - requestId: idToStr(requestId), + gatewayId: utils.idToStr(gatewayId), + requestId: utils.idToStr(requestId), }); if (path === PATH_CONNECT) { return true; @@ -1086,7 +1087,7 @@ export class EngineActorDriver implements ActorDriver { // Find actor config const definition = lookupInRegistry( this.#config, - actorInstance.config.name, + actorInstance.name, ); // Check if can hibernate @@ -1129,28 +1130,28 @@ export class EngineActorDriver implements ActorDriver { } } - async #hwsLoadAll( - actorId: string, - ): Promise { - const actor = await this.loadActor(actorId); - return actor.conns - .values() - .map((conn) => { - const connStateManager = conn[CONN_STATE_MANAGER_SYMBOL]; - const hibernatable = connStateManager.hibernatableData; - if (!hibernatable) return undefined; - return { - gatewayId: hibernatable.gatewayId, - requestId: hibernatable.requestId, - serverMessageIndex: hibernatable.serverMessageIndex, - clientMessageIndex: hibernatable.clientMessageIndex, - path: hibernatable.requestPath, - headers: hibernatable.requestHeaders, - } satisfies HibernatingWebSocketMetadata; - }) - .filter((x) => x !== undefined) - .toArray(); - } + // async #hwsLoadAll( + // actorId: string, + // ): Promise { + // const actor = await this.loadActor(actorId); + // return actor.conns + // .values() + // .map((conn) => { + // const connStateManager = conn[CONN_STATE_MANAGER_SYMBOL]; + // const hibernatable = connStateManager.hibernatableData; + // if (!hibernatable) return undefined; + // return { + // gatewayId: hibernatable.gatewayId, + // requestId: hibernatable.requestId, + // serverMessageIndex: hibernatable.serverMessageIndex, + // clientMessageIndex: hibernatable.clientMessageIndex, + // path: hibernatable.requestPath, + // headers: hibernatable.requestHeaders, + // } satisfies HibernatingWebSocketMetadata; + // }) + // .filter((x) => x !== undefined) + // .toArray(); + // } async onBeforeActorStart(actor: AnyActorInstance): Promise { // Resolve promise if waiting @@ -1160,9 +1161,10 @@ export class EngineActorDriver implements ActorDriver { handler.actorStartPromise?.resolve(); handler.actorStartPromise = undefined; - // Restore hibernating requests - const metaEntries = await this.#hwsLoadAll(actor.id); - await this.#runner.restoreHibernatingRequests(actor.id, metaEntries); + // TODO: + // // Restore hibernating requests + // const metaEntries = await this.#hwsLoadAll(actor.id); + // await this.#envoy.restoreHibernatingRequests(actor.id, metaEntries); } onCreateConn(conn: AnyConn) { @@ -1229,11 +1231,12 @@ export class EngineActorDriver implements ActorDriver { entry.pendingAckFromMessageIndex || entry.pendingAckFromBufferSize ) { - this.#runner.sendHibernatableWebSocketMessageAck( - hibernatable.gatewayId, - hibernatable.requestId, - entry.serverMessageIndex, - ); + // TODO: + // this.#envoy.sendHibernatableWebSocketMessageAck( + // hibernatable.gatewayId, + // hibernatable.requestId, + // entry.serverMessageIndex, + // ); entry.pendingAckFromMessageIndex = false; entry.pendingAckFromBufferSize = false; entry.bufferedMessageSize = 0; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/envoy.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/envoy.ts new file mode 100644 index 0000000000..b2ec654cc4 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/registry/config/envoy.ts @@ -0,0 +1,34 @@ +import { z } from "zod/v4"; +import { getLogger } from "@/common/log"; +import { + isDev, + getNodeEnv, + getRivetPool, + getRivetTotalSlots, + getRivetEnvoyVersion, +} from "@/utils/env-vars"; + +let warnedMissingVersion = false; + +export const EnvoyConfigSchema = z.object({ + poolName: z.string().default(() => getRivetPool() ?? "default"), + version: z.number().default(() => { + const version = getRivetEnvoyVersion(); + if (version !== undefined) return version; + + if (getNodeEnv() === "production" && !warnedMissingVersion) { + warnedMissingVersion = true; + getLogger("rivetkit").error( + "RIVET_ENVOY_VERSION is not set. Actors will not be versioned, which means they won't be drained on deploy. This is only needed when self-hosting or using a custom envoy (not needed for Rivet Compute). Set this as a build arg in your Dockerfile. See https://rivet.dev/docs/actors/versions", + ); + } + + return 1; + }), + + // Deprecated. + totalSlots: z.number().default(() => getRivetTotalSlots() ?? 100000), + runnerKey: z.string().optional(), +}); +export type EnvoyConfigInput = z.input; +export type EnvoyConfig = z.infer; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts index 2a3f29c560..f04d240a81 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/config/index.ts @@ -15,7 +15,7 @@ import { isDev, } from "@/utils/env-vars"; import { type DriverConfig, DriverConfigSchema } from "./driver"; -import { RunnerConfigSchema } from "./runner"; +import { EnvoyConfigSchema } from "./envoy"; import { ServerlessConfigSchema } from "./serverless"; export { DriverConfigSchema, type DriverConfig }; @@ -186,8 +186,8 @@ export const RegistryConfigSchema = z serverless: ServerlessConfigSchema.optional().default(() => ServerlessConfigSchema.parse({}), ), - runner: RunnerConfigSchema.optional().default(() => - RunnerConfigSchema.parse({}), + envoy: EnvoyConfigSchema.optional().default(() => + EnvoyConfigSchema.parse({}), ), }) .transform((config, ctx) => { @@ -196,11 +196,11 @@ export const RegistryConfigSchema = z // Parse endpoint string (env var fallback is applied via transform above) const parsedEndpoint = config.endpoint ? tryParseEndpoint(ctx, { - endpoint: config.endpoint, - path: ["endpoint"], - namespace: config.namespace, - token: config.token, - }) + endpoint: config.endpoint, + path: ["endpoint"], + namespace: config.namespace, + token: config.token, + }) : undefined; if (parsedEndpoint && config.serveManager) { @@ -218,16 +218,16 @@ export const RegistryConfigSchema = z }); } - // configureRunnerPool requires an engine (via endpoint or spawnEngine) + // configurerPool requires an engine (via endpoint or spawnEngine) if ( - config.serverless.configureRunnerPool && + config.serverless.configurePool && !parsedEndpoint && !config.serverless.spawnEngine ) { ctx.addIssue({ code: "custom", message: - "configureRunnerPool requires either endpoint or spawnEngine", + "configurePool requires either endpoint or spawnEngine", }); } @@ -245,9 +245,9 @@ export const RegistryConfigSchema = z // Parse publicEndpoint string (env var fallback is applied via transform in serverless schema) const parsedPublicEndpoint = config.serverless.publicEndpoint ? tryParseEndpoint(ctx, { - endpoint: config.serverless.publicEndpoint, - path: ["serverless", "publicEndpoint"], - }) + endpoint: config.serverless.publicEndpoint, + path: ["serverless", "publicEndpoint"], + }) : undefined; // Validate that publicEndpoint namespace matches backend namespace if specified @@ -281,9 +281,9 @@ export const RegistryConfigSchema = z const willUseEngine = !!endpoint || config.serverless.spawnEngine; const inspector = willUseEngine ? { - ...config.inspector, - enabled: { manager: false, actor: true }, - } + ...config.inspector, + enabled: { manager: false, actor: true }, + } : config.inspector; return { diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/legacy-runner.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/legacy-runner.ts index 7aabeea402..88d30e94c6 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/config/legacy-runner.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/config/legacy-runner.ts @@ -13,7 +13,7 @@ import { getEnvUniversal, VERSION } from "@/utils"; import { getRivetRunEngine, getRivetRunEngineVersion, - getRivetRunnerKind, + getRivetEnvoyKind, getRivetToken, } from "@/utils/env-vars"; @@ -74,7 +74,7 @@ const LegacyRunnerConfigSchemaUnmerged = z .enum(["serverless", "normal"]) .optional() .default(() => - getRivetRunnerKind() === "serverless" ? "serverless" : "normal", + getRivetEnvoyKind() === "serverless" ? "serverless" : "normal", ), totalSlots: z.number().optional(), diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts deleted file mode 100644 index 788134588f..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { z } from "zod/v4"; -import { getLogger } from "@/common/log"; -import { - isDev, - getNodeEnv, - getRivetTotalSlots, - getRivetRunner, - getRivetRunnerVersion, -} from "@/utils/env-vars"; - -let warnedMissingVersion = false; - -export const RunnerConfigSchema = z.object({ - // MARK: Runner - totalSlots: z.number().default(() => getRivetTotalSlots() ?? 100000), - runnerName: z.string().default(() => getRivetRunner() ?? "default"), - // Deprecated. - runnerKey: z.string().optional(), - version: z.number().default(() => { - const version = getRivetRunnerVersion(); - if (version !== undefined) return version; - - if (getNodeEnv() === "production" && !warnedMissingVersion) { - warnedMissingVersion = true; - getLogger("rivetkit").error( - "RIVET_RUNNER_VERSION is not set. Actors will not be versioned, which means they won't be drained on deploy. This is only needed when self-hosting or using a custom runner (not needed for Rivet Compute). Set this as a build arg in your Dockerfile. See https://rivet.dev/docs/actors/versions", - ); - } - - return 1; - }), -}); -export type RunnerConfigInput = z.input; -export type RunnerConfig = z.infer; diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/serverless.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/serverless.ts index 5d2e689433..e02e36d8c4 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/config/serverless.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/config/serverless.ts @@ -7,19 +7,22 @@ import { getRivetPublicToken, } from "@/utils/env-vars"; -export const ConfigureRunnerPoolSchema = z +export const ConfigurePoolSchema = z .object({ name: z.string().optional(), url: z.string(), headers: z.record(z.string(), z.string()).optional(), - maxRunners: z.number().optional(), - minRunners: z.number().optional(), requestLifespan: z.number().optional(), - runnersMargin: z.number().optional(), - slotsPerRunner: z.number().optional(), + maxConcurrentActors: z.number().optional(), metadata: z.record(z.string(), z.unknown()).optional(), metadataPollInterval: z.number().optional(), drainOnVersionUpgrade: z.boolean().optional(), + + // Deprecated + maxRunners: z.number().optional(), + minRunners: z.number().optional(), + runnersMargin: z.number().optional(), + slotsPerRunner: z.number().optional(), }) .optional(); @@ -42,11 +45,11 @@ export const ServerlessConfigSchema = z.object({ /** * @experimental * - * Automatically configure serverless runners in the engine. - * Can only be used when runnerKind is "serverless". + * Automatically configure serverless envoys in the engine. + * Can only be used when envoyKind is "serverless". * If true, uses default configuration. Can also provide custom configuration. */ - configureRunnerPool: ConfigureRunnerPoolSchema.optional(), + configurePool: ConfigurePoolSchema.optional(), // MARK: Routing // TODO: serverlessBasePath? better naming? diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts index caf02deb28..20e59f153f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/index.ts @@ -92,11 +92,11 @@ export class Registry { } /** - * Starts an actor runner for standalone server deployments. + * Starts an actor envoy for standalone server deployments. */ - public startRunner() { + public startEnvoy() { // biome-ignore lint/nursery/noFloatingPromises: bg task - this.#ensureRuntime().then((runtime) => runtime.startRunner()); + this.#ensureRuntime().then((runtime) => runtime.startEnvoy()); } /** @@ -104,7 +104,7 @@ export class Registry { * * This is the simplest way to run RivetKit. It starts a local manager * server, serves static files from the configured `publicDir` (default - * `"public"`), and starts the actor runner. + * `"public"`), and starts the actor envoy. * * When an endpoint is configured (via config or RIVET_ENDPOINT env var), * operates in serverless mode connected to the remote engine instead. @@ -125,7 +125,7 @@ export class Registry { // manager server starts and serves the API + static files. // When an endpoint IS configured, the config transform handles // the mode (serveManager defaults to false, spawnEngine may be - // true, etc.) and we just start the runner. + // true, etc.) and we just start the envoy. if (this.#config.serveManager === undefined) { const hasEndpoint = !!( this.#config.endpoint || @@ -139,7 +139,7 @@ export class Registry { } // biome-ignore lint/nursery/noFloatingPromises: fire-and-forget - this.#ensureRuntime().then((runtime) => runtime.startRunner()); + this.#ensureRuntime().then((runtime) => runtime.startEnvoy()); } } diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts index b4c490a2d0..75efd4c556 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/api-endpoints.ts @@ -59,7 +59,7 @@ export async function getOrCreateActor( return apiCall( config, "PUT", - `/actors`, + `/actors2`, request, ); } @@ -72,7 +72,7 @@ export async function createActor( return apiCall( config, "POST", - `/actors`, + `/actors2`, request, ); } diff --git a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts index 7ca17d2f3d..45ad4f2e3f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/remote-manager-driver/mod.ts @@ -196,7 +196,7 @@ export class RemoteManagerDriver implements ManagerDriver { datacenter: region, name, key: serializeActorKey(key), - runner_name_selector: this.#config.runnerName, + runner_name_selector: this.#config.poolName, input: actorInput ? uint8ArrayToBase64(cbor.encode(actorInput)) : undefined, @@ -232,7 +232,7 @@ export class RemoteManagerDriver implements ManagerDriver { const result = await createActor(this.#config, { datacenter: region, name, - runner_name_selector: this.#config.runnerName, + runner_name_selector: this.#config.poolName, key: serializeActorKey(key), input: input ? uint8ArrayToBase64(cbor.encode(input)) : undefined, crash_policy: "sleep", diff --git a/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts b/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts index e2c77db045..a7819190d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts +++ b/rivetkit-typescript/packages/rivetkit/src/serverless/configure.ts @@ -7,10 +7,10 @@ import { updateRunnerConfig, } from "@/remote-manager-driver/api-endpoints"; -export async function configureServerlessRunner( +export async function configureServerlessPool( config: RegistryConfig, ): Promise { - logger().debug("configuring serverless runner"); + logger().debug("configuring serverless pool"); try { // Ensure we have required config values @@ -26,8 +26,8 @@ export async function configureServerlessRunner( } // Prepare the configuration - const customConfig = config.serverless.configureRunnerPool; - invariant(customConfig, "configureRunnerPool should exist"); + const customConfig = config.serverless.configurePool; + invariant(customConfig, "configurePool should exist"); const clientConfig = convertRegistryConfigToClientConfig(config); @@ -39,46 +39,48 @@ export async function configureServerlessRunner( const dcsRes = await getDatacenters(clientConfig); // Build the request body - const runnerName = customConfig.name ?? "default"; + const poolName = customConfig.name ?? "default"; logger().debug({ - msg: "configuring serverless runner", - runnerName, + msg: "configuring serverless pool", + poolName, namespace: config.namespace, }); const serverlessConfig = { serverless: { url: customConfig.url, headers: customConfig.headers ?? {}, + request_lifespan: customConfig.requestLifespan ?? 15 * 60, + max_concurrent_actors: customConfig.maxConcurrentActors ?? 100_000, + metadata_poll_interval: + customConfig.metadataPollInterval ?? 1000, + max_runners: customConfig.maxRunners ?? 100_000, min_runners: customConfig.minRunners ?? 0, - request_lifespan: customConfig.requestLifespan ?? 15 * 60, runners_margin: customConfig.runnersMargin ?? 0, slots_per_runner: customConfig.slotsPerRunner ?? 1, - metadata_poll_interval: - customConfig.metadataPollInterval ?? 1000, }, metadata: customConfig.metadata ?? {}, drain_on_version_upgrade: customConfig.drainOnVersionUpgrade ?? true, metadataPollInterval: customConfig.metadataPollInterval ?? 1000, }; - await updateRunnerConfig(clientConfig, runnerName, { + await updateRunnerConfig(clientConfig, poolName, { datacenters: Object.fromEntries( dcsRes.datacenters.map((dc) => [dc.name, serverlessConfig]), ), }); logger().info({ - msg: "serverless runner configured successfully", - runnerName, + msg: "serverless pool configured successfully", + poolName, namespace: config.namespace, }); } catch (error) { logger().error({ - msg: "failed to configure serverless runner, validate endpoint is configured correctly then restart this process", + msg: "failed to configure serverless pool, validate endpoint is configured correctly then restart this process", error, }); - // Don't throw, allow the runner to continue + // Don't throw, allow the envoy to continue } } diff --git a/rivetkit-typescript/packages/rivetkit/src/test/mod.ts b/rivetkit-typescript/packages/rivetkit/src/test/mod.ts index c96e94199d..1c0cb802d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/test/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/test/mod.ts @@ -89,7 +89,7 @@ export async function setupTest>( const client = createClient({ endpoint, namespace: "default", - runnerName: "default", + poolName: "default", disableMetadataLookup: true, }); c.onTestFinished(async () => await client.dispose()); diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts index 76cd2fbf7c..5e0cf1f3ad 100644 --- a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts +++ b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts @@ -14,8 +14,8 @@ export const getRivetToken = (): string | undefined => getEnvUniversal("RIVET_TOKEN"); export const getRivetNamespace = (): string | undefined => getEnvUniversal("RIVET_NAMESPACE"); -export const getRivetRunner = (): string | undefined => - getEnvUniversal("RIVET_RUNNER"); +export const getRivetPool = (): string | undefined => + getEnvUniversal("RIVET_POOL"); export const getRivetTotalSlots = (): number | undefined => { const value = getEnvUniversal("RIVET_TOTAL_SLOTS"); return value !== undefined ? parseInt(value, 10) : undefined; @@ -24,10 +24,10 @@ export const getRivetRunEngine = (): boolean => getEnvUniversal("RIVET_RUN_ENGINE") === "1"; export const getRivetRunEngineVersion = (): string | undefined => getEnvUniversal("RIVET_RUN_ENGINE_VERSION"); -export const getRivetRunnerKind = (): string | undefined => - getEnvUniversal("RIVET_RUNNER_KIND"); -export const getRivetRunnerVersion = (): number | undefined => { - const value = getEnvUniversal("RIVET_RUNNER_VERSION"); +export const getRivetEnvoyKind = (): string | undefined => + getEnvUniversal("RIVET_ENVOY_KIND"); +export const getRivetEnvoyVersion = (): number | undefined => { + const value = getEnvUniversal("RIVET_ENVOY_VERSION"); return value !== undefined ? parseInt(value, 10) : undefined; }; export const getRivetPublicEndpoint = (): string | undefined => diff --git a/scripts/tests/utils.ts b/scripts/tests/utils.ts index b3a672973e..7b6cd614ef 100644 --- a/scripts/tests/utils.ts +++ b/scripts/tests/utils.ts @@ -9,7 +9,7 @@ export async function createActor( withKey: boolean = true ): Promise { const response = await fetch( - `${RIVET_ENDPOINT}/actors?namespace=${namespaceName}`, + `${RIVET_ENDPOINT}/actors2?namespace=${namespaceName}`, { method: "POST", headers: {