diff --git a/CLAUDE.md b/CLAUDE.md index e909853942..44556847de 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -80,6 +80,11 @@ git commit -m "chore(my-pkg): foo bar" ### pnpm Workspace - Use pnpm for all npm-related commands. We're using a pnpm workspace. +### TypeScript Concurrency +- Use `antiox` for TypeScript concurrency primitives instead of ad hoc Promise queues, custom channel wrappers, or event-emitter based coordination. +- Prefer the Tokio-shaped APIs from `antiox` for concurrency needs. For example, use `antiox/sync/mpsc` for `tx` and `rx` channels, `antiox/task` for spawning tasks, and the matching sync and time modules as needed. +- Treat `antiox` as the default choice for any TypeScript concurrency work because it mirrors Rust and Tokio APIs used elsewhere in the codebase. + ### SQLite Package - Use `@rivetkit/sqlite` for SQLite WebAssembly support. - Do not use the legacy upstream package directly. `@rivetkit/sqlite` is the maintained fork used in this repository and is sourced from `rivet-dev/wa-sqlite`. diff --git a/biome.json b/biome.json index 8113890c1b..8383e5841a 100644 --- a/biome.json +++ b/biome.json @@ -7,6 +7,7 @@ "!**/*.gen.*", "rivetkit-typescript/**/*.{tsx,ts,css}", "examples/**/*.{ts,tsx}", + "engine/sdks/typescript/**/*.ts", "!/**/node_modules" ], "ignoreUnknown": true diff --git a/engine/sdks/typescript/envoy-client/package.json b/engine/sdks/typescript/envoy-client/package.json index 866fe6e050..7b59e69543 100644 --- a/engine/sdks/typescript/envoy-client/package.json +++ b/engine/sdks/typescript/envoy-client/package.json @@ -9,16 +9,16 @@ ], "exports": { "import": { - "types": "./dist/mod.d.ts", - "default": "./dist/mod.js" + "types": "./dist/index.d.ts", + "default": "./dist/index.js" }, "require": { - "types": "./dist/mod.d.cts", - "default": "./dist/mod.cjs" + "types": "./dist/index.d.cts", + "default": "./dist/index.cjs" } }, "scripts": { - "build": "tsup src/mod.ts", + "build": "tsup src/index.ts", "check-types": "tsc --noEmit", "test": "vitest run", "test:watch": "vitest", @@ -27,6 +27,7 @@ "dependencies": { "@rivetkit/virtual-websocket": "workspace:*", "@rivetkit/engine-envoy-protocol": "workspace:*", + "antiox": "link:/home/nathan/antiox", "uuid": "^12.0.0", "pino": "^9.9.5", "ws": "^8.18.3" @@ -40,4 +41,4 @@ "typescript": "^5.9.2", "vitest": "^1.6.1" } -} \ No newline at end of file +} diff --git a/engine/sdks/typescript/envoy-client/src/config.ts b/engine/sdks/typescript/envoy-client/src/config.ts new file mode 100644 index 0000000000..eb668573c5 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/config.ts @@ -0,0 +1,159 @@ +import type { Logger } from "pino"; +import * as protocol from "@rivetkit/engine-envoy-protocol"; +import type { EnvoyHandle } from "./handle.js"; + +export interface EnvoyConfig { + logger?: Logger; + version: number; + endpoint: string; + token?: string; + namespace: string; + poolName: string; + prepopulateActorNames: Record }>; + metadata?: Record; + + /** + * Debug option to inject artificial latency (in ms) into WebSocket + * communication. Messages are queued and delivered in order after the + * configured delay. + * + * @experimental For testing only. + */ + debugLatencyMs?: number; + + /** Called when receiving a network request. */ + fetch: ( + envoyHandle: EnvoyHandle, + actorId: string, + gatewayId: protocol.GatewayId, + requestId: protocol.RequestId, + request: Request, + ) => Promise; + + // TODO: fix doc comment + /** + * Called when receiving a WebSocket connection. + * + * All event listeners must be added synchronously inside this function or + * else events may be missed. The open event will fire immediately after + * this function finishes. + * + * Any errors thrown here will disconnect the WebSocket immediately. + * + * While `path` and `headers` are partially redundant to the data in the + * `Request`, they may vary slightly from the actual content of `Request`. + * Prefer to persist the `path` and `headers` properties instead of the + * `Request` itself. + * + * ## Hibernating Web Sockets + * + * ### Implementation Requirements + * + * **Requirement 1: Persist HWS Immediately** + * + * This is responsible for persisting hibernatable WebSockets immediately + * (do not wait for open event). It is not time sensitive to flush the + * connection state. If this fails to persist the HWS, the client's + * WebSocket will be disconnected on next wake in the call to + * `Tunnel::restoreHibernatingRequests` since the connection entry will not + * exist. + * + * **Requirement 2: Persist Message Index On `message`** + * + * In the `message` event listener, this handler must persist the message + * index from the event. The request ID is available at + * `event.rivetRequestId` and message index at `event.rivetMessageIndex`. + * + * The message index should not be flushed immediately. Instead, this + * should: + * + * - Debounce calls to persist the message index + * - After each persist, call + * `Runner::sendHibernatableWebSocketMessageAck` to acknowledge the + * message + * + * This mechanism allows us to buffer messages on the gateway so we can + * batch-persist events on our end on a given interval. + * + * If this fails to persist, then the gateway will replay unacked + * messages when the actor starts again. + * + * **Requirement 3: Remove HWS From Storage On `close`** + * + * This handler should add an event listener for `close` to remove the + * connection from storage. + * + * If the connection remove fails to persist, the close event will be + * called again on the next actor start in + * `Tunnel::restoreHibernatingRequests` since there will be no request for + * the given connection. + * + * ### Restoring Connections + * + * The user of this library is responsible for: + * 1. Loading all persisted hibernatable WebSocket metadata for an actor + * 2. Calling `Runner::restoreHibernatingRequests` with this metadata at + * the end of `onActorStart` + * + * `restoreHibernatingRequests` will restore all connections and attach + * the appropriate event listeners. + * + * ### No Open Event On Restoration + * + * When restoring a HWS, the open event will not be called again. It will + * go straight to the message or close event. + */ + websocket: ( + envoyHandle: EnvoyHandle, + actorId: string, + ws: any, + gatewayId: protocol.GatewayId, + requestId: protocol.RequestId, + request: Request, + path: string, + headers: Record, + isHibernatable: boolean, + isRestoringHibernatable: boolean, + ) => Promise; + + hibernatableWebSocket: { + /** + * Determines if a WebSocket can continue to live while an actor goes to + * sleep. + */ + canHibernate: ( + actorId: string, + gatewayId: ArrayBuffer, + requestId: ArrayBuffer, + request: Request, + ) => boolean; + }; + + // TODO: Fix doc comment + /** + * Called when an actor starts. + * + * This callback is responsible for: + * 1. Initializing the actor instance + * 2. Loading all persisted hibernatable WebSocket metadata for this actor + * 3. Calling `Runner::restoreHibernatingRequests` with the loaded metadata + * to restore hibernatable WebSocket connections + * + * The actor should not be marked as "ready" until after + * `restoreHibernatingRequests` completes to ensure all hibernatable + * connections are fully restored before the actor processes new requests. + */ + onActorStart: ( + envoyHandle: EnvoyHandle, + actorId: string, + generation: number, + config: protocol.ActorConfig, + ) => Promise; + + onActorStop: ( + envoyHandle: EnvoyHandle, + actorId: string, + generation: number, + reason: protocol.StopActorReason, + ) => Promise; +} diff --git a/engine/sdks/typescript/envoy-client/src/context.ts b/engine/sdks/typescript/envoy-client/src/context.ts new file mode 100644 index 0000000000..83b282a4ca --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/context.ts @@ -0,0 +1,24 @@ +import type { Logger } from "pino"; +import type { UnboundedSender } from "antiox/sync/mpsc"; +import type { EnvoyConfig } from "./config.js"; +import type { EnvoyHandle } from "./handle.js"; +import type { ToEnvoyMessage } from "./tasks/envoy/index.js"; +import type { WebSocketTxMessage } from "./websocket.js"; + +export interface SharedContext { + config: EnvoyConfig; + + /** Unique string identifying this Envoy process. */ + envoyKey: string; + + /** Cached child logger with envoy-specific attributes. */ + logCached?: Logger; + + envoyTx: UnboundedSender; + + /** Handle passed to user callbacks for interacting with actors. */ + handle: EnvoyHandle; + + /** Current websocket sender. Set by connect, undefined between connections. */ + wsTx?: UnboundedSender; +} diff --git a/engine/sdks/typescript/envoy-client/src/handle.ts b/engine/sdks/typescript/envoy-client/src/handle.ts new file mode 100644 index 0000000000..4c783fc4fb --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/handle.ts @@ -0,0 +1,74 @@ +import type * as protocol from "@rivetkit/engine-envoy-protocol"; + +export interface KvListOptions { + reverse?: boolean; + limit?: number; +} + +export interface EnvoyHandle { + /** Send sleep intent for an actor. */ + sleepActor(actorId: string, generation?: number): void; + + /** Send stop intent for an actor. */ + stopActor(actorId: string, generation?: number): void; + + /** + * Like stopActor but ensures the actor is fully destroyed rather than + * potentially being kept for hibernation. + */ + destroyActor(actorId: string, generation?: number): void; + + /** Set or clear an alarm for an actor. Pass null to clear. */ + setAlarm( + actorId: string, + alarmTs: number | null, + generation?: number, + ): void; + + /** Get values for the given keys. Returns null for missing keys. */ + kvGet( + actorId: string, + keys: Uint8Array[], + ): Promise<(Uint8Array | null)[]>; + + /** List all key-value pairs. */ + kvListAll( + actorId: string, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]>; + + /** List key-value pairs within a key range. */ + kvListRange( + actorId: string, + start: Uint8Array, + end: Uint8Array, + exclusive?: boolean, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]>; + + /** List key-value pairs matching a prefix. */ + kvListPrefix( + actorId: string, + prefix: Uint8Array, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]>; + + /** Put key-value pairs. */ + kvPut( + actorId: string, + entries: [Uint8Array, Uint8Array][], + ): Promise; + + /** Delete specific keys. */ + kvDelete(actorId: string, keys: Uint8Array[]): Promise; + + /** Delete a range of keys. */ + kvDeleteRange( + actorId: string, + start: Uint8Array, + end: Uint8Array, + ): Promise; + + /** Drop all key-value data for an actor. */ + kvDrop(actorId: string): Promise; +} diff --git a/engine/sdks/typescript/envoy-client/src/index.ts b/engine/sdks/typescript/envoy-client/src/index.ts new file mode 100644 index 0000000000..41bb87aa85 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/index.ts @@ -0,0 +1,9 @@ +export type { EnvoyConfig } from "./config.js"; +export type { SharedContext } from "./context.js"; +export type { EnvoyHandle, KvListOptions } from "./handle.js"; +export { + type EnvoyContext, + type ToEnvoyMessage, + type ToEnvoyConnMessage, + startEnvoy, +} from "./tasks/envoy/index.js"; diff --git a/engine/sdks/typescript/envoy-client/src/latency-channel.ts b/engine/sdks/typescript/envoy-client/src/latency-channel.ts new file mode 100644 index 0000000000..7f03e03f6d --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/latency-channel.ts @@ -0,0 +1,39 @@ +import { + type UnboundedReceiver, + type UnboundedSender, + unboundedChannel, +} from "antiox/sync/mpsc"; +import { spawn } from "antiox/task"; +import { injectLatency } from "./utils.js"; + +export type LatencyChannel = [UnboundedSender, UnboundedReceiver]; + +/** + * Returns an antiox channel that delays delivery to the receiver by the + * configured latency while preserving message order. + */ +export function latencyChannel(debugLatencyMs?: number): LatencyChannel { + if (!debugLatencyMs) { + return unboundedChannel(); + } + + const [inputTx, inputRx] = unboundedChannel(); + const [outputTx, outputRx] = unboundedChannel(); + + spawn(async () => { + for await (const message of inputRx) { + await injectLatency(debugLatencyMs); + + try { + outputTx.send(message); + } catch { + inputRx.close(); + break; + } + } + + outputTx.close(); + }); + + return [inputTx, outputRx]; +} diff --git a/engine/sdks/typescript/envoy-client/src/mod.ts b/engine/sdks/typescript/envoy-client/src/mod.ts deleted file mode 100644 index c2d73c4934..0000000000 --- a/engine/sdks/typescript/envoy-client/src/mod.ts +++ /dev/null @@ -1,191 +0,0 @@ -import * as protocol from "@rivetkit/engine-envoy-protocol"; -import type { Logger } from "pino"; -import type WebSocket from "ws"; -import { logger } from "./log.js"; -import { importWebSocket } from "./websocket.js"; -import { - v4 as uuidv4, -} from "uuid"; -import { inspect } from "util"; - -export { idToStr, injectLatency } from "./utils"; - -export interface EnvoyConfig { - logger?: Logger; - version: number; - endpoint: string; - token?: string; - namespace: string; - poolName: string; - prepopulateActorNames: Record }>; - metadata?: Record; - - /** - * Debug option to inject artificial latency (in ms) into WebSocket - * communication. Messages are queued and delivered in order after the - * configured delay. - * - * @experimental For testing only. - */ - debugLatencyMs?: number; -} - -export class Envoy { - #config: EnvoyConfig; - #envoyKey: string = uuidv4(); - #ws?: WebSocket; - - #started: boolean = false; - - // Cached child logger with envoy-specific attributes - #logCached?: Logger; - - constructor(config: EnvoyConfig) { - this.#config = config; - } - - #wsUrl() { - const wsEndpoint = this.#config.endpoint - .replace("http://", "ws://") - .replace("https://", "wss://"); - - // Ensure the endpoint ends with /runners/connect - const baseUrl = wsEndpoint.endsWith("/") - ? wsEndpoint.slice(0, -1) - : wsEndpoint; - const parameters = [ - ['protocol_version', protocol.VERSION], - ['namespace', this.#config.namespace], - ['envoy_key', this.#envoyKey], - ['pool_name', this.#config.poolName], - ]; - return `${baseUrl}/envoys/connect?${parameters.map(([k, v]) => `${k}=${encodeURIComponent(v)}`).join('&')}`; - } - - async start() { - if (this.#started) throw new Error("Cannot call envoy.start twice"); - this.#started = true; - - this.log?.info({ msg: "starting envoy" }); - - try { - await this.#connect(); - } catch (error) { - this.#started = false; - throw error; - } - } - - async #connect() { - const WS = await importWebSocket(); - - // Assertion to clear previous WebSocket - if ( - this.#ws && - (this.#ws.readyState === WS.CONNECTING || - this.#ws.readyState === WS.OPEN) - ) { - this.log?.error( - "found duplicate ws, closing previous", - ); - this.#ws.close(1000, "duplicate_websocket"); - } - - const protocols = ["rivet"]; - if (this.#config.token) - protocols.push(`rivet_token.${this.#config.token}`); - - - this.#ws = new WS(this.#wsUrl(), protocols) as any as WebSocket; - - this.log?.info({ - msg: "connecting", - endpoint: this.#config.endpoint, - namespace: this.#config.namespace, - envoyKey: this.#envoyKey, - hasToken: !!this.#config.token, - }); - - this.#ws.addEventListener("open", () => { - // Send init message - const init: protocol.ToRivetInit = { - envoyKey: this.#envoyKey, - name: this.#config.poolName, - version: this.#config.version, - prepopulateActorNames: new Map( - Object.entries(this.#config.prepopulateActorNames).map( - ([name, data]) => [ - name, - { metadata: JSON.stringify(data.metadata) }, - ], - ), - ), - metadata: JSON.stringify(this.#config.metadata), - }; - - this.#wsSend({ - tag: "ToRivetInit", - val: init, - }); - }); - } - - #wsSend(message: protocol.ToRivet) { - this.log?.debug({ - msg: "sending runner message", - data: inspect(message), - }); - - const encoded = protocol.encodeToRivet(message); - - // Normally synchronous. When debugLatencyMs is set, the send is - // deferred but message order is preserved. - injectLatency(this.#config.debugLatencyMs).then(() => { - const ws = this.getWsIfReady(); - if (ws) { - ws.send(encoded); - } else { - this.log?.error({ - msg: "WebSocket not available or not open for sending data", - }); - } - }); - } - - /** Asserts WebSocket exists and is ready. */ - getWsIfReady(): WebSocket | undefined { - if ( - !!this.#ws && - this.#ws.readyState === 1 - ) { - return this.#ws; - } else { - return undefined; - } - } - - dispose() { - - } - - get log(): Logger | undefined { - if (this.#logCached) return this.#logCached; - - const l = logger(); - if (l) { - // If has connected, create child logger with relevant metadata - // - // Otherwise, return default logger - if (this.#envoyKey) { - this.#logCached = l.child({ - envoyKey: this.#envoyKey, - }); - return this.#logCached; - } else { - return l; - } - } - - return undefined; - } -} diff --git a/engine/sdks/typescript/envoy-client/src/stringify.ts b/engine/sdks/typescript/envoy-client/src/stringify.ts new file mode 100644 index 0000000000..7f5904bed1 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/stringify.ts @@ -0,0 +1,300 @@ +import type * as protocol from "@rivetkit/engine-envoy-protocol"; +import { idToStr } from "./utils"; + +function stringifyArrayBuffer(buffer: ArrayBuffer): string { + return `ArrayBuffer(${buffer.byteLength})`; +} + +function stringifyBigInt(value: bigint): string { + return `${value}n`; +} + +function stringifyMap(map: ReadonlyMap): string { + const entries = Array.from(map.entries()) + .map(([k, v]) => `"${k}": "${v}"`) + .join(", "); + return `Map(${map.size}){${entries}}`; +} + +function stringifyMessageId(messageId: protocol.MessageId): string { + return `MessageId{gatewayId: ${idToStr(messageId.gatewayId)}, requestId: ${idToStr(messageId.requestId)}, messageIndex: ${messageId.messageIndex}}`; +} + +export function stringifyToRivetTunnelMessageKind( + kind: protocol.ToRivetTunnelMessageKind, +): string { + switch (kind.tag) { + case "ToRivetResponseStart": { + const { status, headers, body, stream } = kind.val; + const bodyStr = body === null ? "null" : stringifyArrayBuffer(body); + return `ToRivetResponseStart{status: ${status}, headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`; + } + case "ToRivetResponseChunk": { + const { body, finish } = kind.val; + return `ToRivetResponseChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`; + } + case "ToRivetResponseAbort": + return "ToRivetResponseAbort"; + case "ToRivetWebSocketOpen": { + const { canHibernate } = kind.val; + return `ToRivetWebSocketOpen{canHibernate: ${canHibernate}}`; + } + case "ToRivetWebSocketMessage": { + const { data, binary } = kind.val; + return `ToRivetWebSocketMessage{data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`; + } + case "ToRivetWebSocketMessageAck": { + const { index } = kind.val; + return `ToRivetWebSocketMessageAck{index: ${index}}`; + } + case "ToRivetWebSocketClose": { + const { code, reason, hibernate } = kind.val; + const codeStr = code === null ? "null" : code.toString(); + const reasonStr = reason === null ? "null" : `"${reason}"`; + return `ToRivetWebSocketClose{code: ${codeStr}, reason: ${reasonStr}, hibernate: ${hibernate}}`; + } + } +} + +export function stringifyToEnvoyTunnelMessageKind( + kind: protocol.ToEnvoyTunnelMessageKind, +): string { + switch (kind.tag) { + case "ToEnvoyRequestStart": { + const { actorId, method, path, headers, body, stream } = kind.val; + const bodyStr = body === null ? "null" : stringifyArrayBuffer(body); + return `ToEnvoyRequestStart{actorId: "${actorId}", method: "${method}", path: "${path}", headers: ${stringifyMap(headers)}, body: ${bodyStr}, stream: ${stream}}`; + } + case "ToEnvoyRequestChunk": { + const { body, finish } = kind.val; + return `ToEnvoyRequestChunk{body: ${stringifyArrayBuffer(body)}, finish: ${finish}}`; + } + case "ToEnvoyRequestAbort": + return "ToEnvoyRequestAbort"; + case "ToEnvoyWebSocketOpen": { + const { actorId, path, headers } = kind.val; + return `ToEnvoyWebSocketOpen{actorId: "${actorId}", path: "${path}", headers: ${stringifyMap(headers)}}`; + } + case "ToEnvoyWebSocketMessage": { + const { data, binary } = kind.val; + return `ToEnvoyWebSocketMessage{data: ${stringifyArrayBuffer(data)}, binary: ${binary}}`; + } + case "ToEnvoyWebSocketClose": { + const { code, reason } = kind.val; + const codeStr = code === null ? "null" : code.toString(); + const reasonStr = reason === null ? "null" : `"${reason}"`; + return `ToEnvoyWebSocketClose{code: ${codeStr}, reason: ${reasonStr}}`; + } + } +} + +export function stringifyCommand(command: protocol.Command): string { + switch (command.tag) { + case "CommandStartActor": { + const { config, hibernatingRequests } = command.val; + const keyStr = config.key === null ? "null" : `"${config.key}"`; + const inputStr = + config.input === null + ? "null" + : stringifyArrayBuffer(config.input); + const hibernatingRequestsStr = + hibernatingRequests.length > 0 + ? `[${hibernatingRequests.map((hr) => `{gatewayId: ${idToStr(hr.gatewayId)}, requestId: ${idToStr(hr.requestId)}}`).join(", ")}]` + : "[]"; + return `CommandStartActor{config: {name: "${config.name}", key: ${keyStr}, createTs: ${stringifyBigInt(config.createTs)}, input: ${inputStr}}, hibernatingRequests: ${hibernatingRequestsStr}}`; + } + case "CommandStopActor": { + const { reason } = command.val; + return `CommandStopActor{reason: ${reason}}`; + } + } +} + +export function stringifyCommandWrapper( + wrapper: protocol.CommandWrapper, +): string { + return `CommandWrapper{actorId: "${wrapper.checkpoint.actorId}", generation: "${wrapper.checkpoint.generation}", index: ${stringifyBigInt(wrapper.checkpoint.index)}, inner: ${stringifyCommand(wrapper.inner)}}`; +} + +export function stringifyEvent(event: protocol.Event): string { + switch (event.tag) { + case "EventActorIntent": { + const { intent } = event.val; + const intentStr = + intent.tag === "ActorIntentSleep" + ? "Sleep" + : intent.tag === "ActorIntentStop" + ? "Stop" + : "Unknown"; + return `EventActorIntent{intent: ${intentStr}}`; + } + case "EventActorStateUpdate": { + const { state } = event.val; + let stateStr: string; + if (state.tag === "ActorStateRunning") { + stateStr = "Running"; + } else if (state.tag === "ActorStateStopped") { + const { code, message } = state.val; + const messageStr = message === null ? "null" : `"${message}"`; + stateStr = `Stopped{code: ${code}, message: ${messageStr}}`; + } else { + stateStr = "Unknown"; + } + return `EventActorStateUpdate{state: ${stateStr}}`; + } + case "EventActorSetAlarm": { + const { alarmTs } = event.val; + const alarmTsStr = + alarmTs === null ? "null" : stringifyBigInt(alarmTs); + return `EventActorSetAlarm{alarmTs: ${alarmTsStr}}`; + } + } +} + +export function stringifyEventWrapper(wrapper: protocol.EventWrapper): string { + return `EventWrapper{actorId: ${wrapper.checkpoint.actorId}, generation: "${wrapper.checkpoint.generation}", index: ${stringifyBigInt(wrapper.checkpoint.index)}, inner: ${stringifyEvent(wrapper.inner)}}`; +} + +export function stringifyToRivet(message: protocol.ToRivet): string { + switch (message.tag) { + case "ToRivetInit": { + const { envoyKey, name, version, prepopulateActorNames, metadata } = + message.val; + const prepopulateActorNamesStr = + prepopulateActorNames === null + ? "null" + : `Map(${prepopulateActorNames.size})`; + const metadataStr = metadata === null ? "null" : `"${metadata}"`; + return `ToRivetInit{envoyKey: "${envoyKey}", name: "${name}", version: ${version}, prepopulateActorNames: ${prepopulateActorNamesStr}, metadata: ${metadataStr}}`; + } + case "ToRivetEvents": { + const events = message.val; + return `ToRivetEvents{count: ${events.length}, events: [${events.map((e) => stringifyEventWrapper(e)).join(", ")}]}`; + } + case "ToRivetAckCommands": { + const { lastCommandCheckpoints } = message.val; + const checkpointsStr = + lastCommandCheckpoints.length > 0 + ? `[${lastCommandCheckpoints.map((cp) => `{actorId: "${cp.actorId}", index: ${stringifyBigInt(cp.index)}}`).join(", ")}]` + : "[]"; + return `ToRivetAckCommands{lastCommandCheckpoints: ${checkpointsStr}}`; + } + case "ToRivetStopping": + return "ToRivetStopping"; + case "ToRivetPong": { + const { ts } = message.val; + return `ToRivetPong{ts: ${stringifyBigInt(ts)}}`; + } + case "ToRivetKvRequest": { + const { actorId, requestId, data } = message.val; + const dataStr = stringifyKvRequestData(data); + return `ToRivetKvRequest{actorId: "${actorId}", requestId: ${requestId}, data: ${dataStr}}`; + } + case "ToRivetTunnelMessage": { + const { messageId, messageKind } = message.val; + return `ToRivetTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToRivetTunnelMessageKind(messageKind)}}`; + } + } +} + +export function stringifyToEnvoy(message: protocol.ToEnvoy): string { + switch (message.tag) { + case "ToEnvoyInit": { + const { metadata } = message.val; + const metadataStr = `{envoyLostThreshold: ${stringifyBigInt(metadata.envoyLostThreshold)}, actorStopThreshold: ${stringifyBigInt(metadata.actorStopThreshold)}, serverlessDrainGracePeriod: ${metadata.serverlessDrainGracePeriod === null ? "null" : stringifyBigInt(metadata.serverlessDrainGracePeriod)}}`; + return `ToEnvoyInit{metadata: ${metadataStr}}`; + } + case "ToEnvoyCommands": { + const commands = message.val; + return `ToEnvoyCommands{count: ${commands.length}, commands: [${commands.map((c) => stringifyCommandWrapper(c)).join(", ")}]}`; + } + case "ToEnvoyAckEvents": { + const { lastEventCheckpoints } = message.val; + const checkpointsStr = + lastEventCheckpoints.length > 0 + ? `[${lastEventCheckpoints.map((cp) => `{actorId: "${cp.actorId}", index: ${stringifyBigInt(cp.index)}}`).join(", ")}]` + : "[]"; + return `ToEnvoyAckEvents{lastEventCheckpoints: ${checkpointsStr}}`; + } + case "ToEnvoyKvResponse": { + const { requestId, data } = message.val; + const dataStr = stringifyKvResponseData(data); + return `ToEnvoyKvResponse{requestId: ${requestId}, data: ${dataStr}}`; + } + case "ToEnvoyTunnelMessage": { + const { messageId, messageKind } = message.val; + return `ToEnvoyTunnelMessage{messageId: ${stringifyMessageId(messageId)}, messageKind: ${stringifyToEnvoyTunnelMessageKind(messageKind)}}`; + } + case "ToEnvoyPing": { + const { ts } = message.val; + return `ToEnvoyPing{ts: ${stringifyBigInt(ts)}}`; + } + } +} + +function stringifyKvRequestData(data: protocol.KvRequestData): string { + switch (data.tag) { + case "KvGetRequest": { + const { keys } = data.val; + return `KvGetRequest{keys: ${keys.length}}`; + } + case "KvListRequest": { + const { query, reverse, limit } = data.val; + const reverseStr = reverse === null ? "null" : reverse.toString(); + const limitStr = limit === null ? "null" : stringifyBigInt(limit); + return `KvListRequest{query: ${stringifyKvListQuery(query)}, reverse: ${reverseStr}, limit: ${limitStr}}`; + } + case "KvPutRequest": { + const { keys, values } = data.val; + return `KvPutRequest{keys: ${keys.length}, values: ${values.length}}`; + } + case "KvDeleteRequest": { + const { keys } = data.val; + return `KvDeleteRequest{keys: ${keys.length}}`; + } + case "KvDeleteRangeRequest": { + const { start, end } = data.val; + return `KvDeleteRangeRequest{start: ${stringifyArrayBuffer(start)}, end: ${stringifyArrayBuffer(end)}}`; + } + case "KvDropRequest": + return "KvDropRequest"; + } +} + +function stringifyKvListQuery(query: protocol.KvListQuery): string { + switch (query.tag) { + case "KvListAllQuery": + return "KvListAllQuery"; + case "KvListRangeQuery": { + const { start, end, exclusive } = query.val; + return `KvListRangeQuery{start: ${stringifyArrayBuffer(start)}, end: ${stringifyArrayBuffer(end)}, exclusive: ${exclusive}}`; + } + case "KvListPrefixQuery": { + const { key } = query.val; + return `KvListPrefixQuery{key: ${stringifyArrayBuffer(key)}}`; + } + } +} + +function stringifyKvResponseData(data: protocol.KvResponseData): string { + switch (data.tag) { + case "KvErrorResponse": { + const { message } = data.val; + return `KvErrorResponse{message: "${message}"}`; + } + case "KvGetResponse": { + const { keys, values, metadata } = data.val; + return `KvGetResponse{keys: ${keys.length}, values: ${values.length}, metadata: ${metadata.length}}`; + } + case "KvListResponse": { + const { keys, values, metadata } = data.val; + return `KvListResponse{keys: ${keys.length}, values: ${values.length}, metadata: ${metadata.length}}`; + } + case "KvPutResponse": + return "KvPutResponse"; + case "KvDeleteResponse": + return "KvDeleteResponse"; + case "KvDropResponse": + return "KvDropResponse"; + } +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/actor.ts b/engine/sdks/typescript/envoy-client/src/tasks/actor.ts new file mode 100644 index 0000000000..2d8430498c --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/actor.ts @@ -0,0 +1,195 @@ +import * as protocol from "@rivetkit/engine-envoy-protocol"; +import { + type UnboundedReceiver, + type UnboundedSender, + unboundedChannel, +} from "antiox/sync/mpsc"; +import { spawn } from "antiox/task"; +import type { SharedContext } from "../context.js"; +import { logger } from "../log.js"; +import { unreachable } from "antiox/panic"; +import { stringifyError } from "../utils.js"; + +export interface CreateActorOpts { + commandIdx: bigint; + actorId: string; + generation: number; + config: protocol.ActorConfig; + hibernatingRequests: readonly protocol.HibernatingRequest[]; +} + +/** + * + * Stop sequence: + * 1. X -> Actor: stop-intent (optional) + * 1. Actor -> Envoy: send-events (optional) + * 1. Envoy -> Actor: command-stop-actor + * 1. Actor: async cleanup + * 1. Actor -> Envoy: state update (stopped) + */ + +// TODO: envoy lost +export type ToActor = + // Sent when wants to stop the actor, will be forwarded to Envoy + | { + type: "actor-intent"; + commandIdx: bigint; + intent: protocol.ActorIntent; + } + // Sent when actor is told to stop + | { + type: "command-stop-actor"; + commandIdx: bigint; + reason: protocol.StopActorReason; + } + // Set or clear an alarm + | { + type: "set-alarm"; + alarmTs: bigint | null; + }; + +interface ActorContext { + shared: SharedContext; + actorId: string; + generation: number; + config: protocol.ActorConfig; + eventIndex: bigint; +} + +export function createActor( + ctx: SharedContext, + start: CreateActorOpts, +): UnboundedSender { + const [tx, rx] = unboundedChannel(); + spawn(() => actorInner(ctx, start, rx)); + return tx; +} + +async function actorInner( + shared: SharedContext, + opts: CreateActorOpts, + rx: UnboundedReceiver, +) { + const ctx: ActorContext = { + shared, + actorId: opts.actorId, + generation: opts.generation, + config: opts.config, + eventIndex: 0n, + }; + + let stopCode = protocol.StopCode.Ok; + let stopMessage: string | null = null; + + try { + await shared.config.onActorStart( + shared.handle, + opts.actorId, + opts.generation, + opts.config, + ); + } catch (error) { + log(ctx)?.error({ + msg: "actor start failed", + actorId: opts.actorId, + error: stringifyError(error), + }); + + stopCode = protocol.StopCode.Error; + stopMessage = + error instanceof Error ? error.message : "actor start failed"; + + sendStoppedEvent(ctx, stopCode, stopMessage); + return; + } + + sendEvent(ctx, { + tag: "EventActorStateUpdate", + val: { state: { tag: "ActorStateRunning", val: null } }, + }); + + for await (const msg of rx) { + if (msg.type === "actor-intent") { + sendEvent(ctx, { + tag: "EventActorIntent", + val: { intent: msg.intent }, + }); + } else if (msg.type === "command-stop-actor") { + try { + await ctx.shared.config.onActorStop( + ctx.shared.handle, + ctx.actorId, + ctx.generation, + msg.reason, + ); + } catch (error) { + log(ctx)?.error({ + msg: "actor stop failed", + actorId: ctx.actorId, + error: stringifyError(error), + }); + + stopCode = protocol.StopCode.Error; + stopMessage = + error instanceof Error + ? error.message + : "actor stop failed"; + } + + sendStoppedEvent(ctx, stopCode, stopMessage); + return; + } else if (msg.type === "set-alarm") { + sendEvent(ctx, { + tag: "EventActorSetAlarm", + val: { alarmTs: msg.alarmTs }, + }); + } else { + unreachable(msg); + } + } +} + +function sendEvent(ctx: ActorContext, inner: protocol.Event) { + ctx.shared.envoyTx.send({ + type: "send-events", + events: [ + { + checkpoint: incrementCheckpoint(ctx), + inner, + }, + ], + }); +} + +function sendStoppedEvent( + ctx: ActorContext, + code: protocol.StopCode, + message: string | null, +) { + const checkpoint = incrementCheckpoint(ctx); + ctx.shared.envoyTx.send({ + type: "command-stop-actor-complete", + checkpointIndex: checkpoint.index, + actorId: ctx.actorId, + generation: ctx.generation, + code, + message, + }); +} + +function incrementCheckpoint(ctx: ActorContext): protocol.ActorCheckpoint { + const index = ctx.eventIndex; + ctx.eventIndex++; + + return { actorId: ctx.actorId, generation: ctx.generation, index }; +} + +function log(ctx: ActorContext) { + const baseLogger = ctx.shared.config.logger ?? logger(); + if (!baseLogger) return undefined; + + return baseLogger.child({ + actorId: ctx.actorId, + generation: ctx.generation, + }); +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/connection.ts b/engine/sdks/typescript/envoy-client/src/tasks/connection.ts new file mode 100644 index 0000000000..4526af8f83 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/connection.ts @@ -0,0 +1,194 @@ +import * as protocol from "@rivetkit/engine-envoy-protocol"; +import type { UnboundedSender } from "antiox/sync/mpsc"; +import { sleep } from "antiox/time"; +import { spawn } from "antiox/task"; +import type { SharedContext } from "../context.js"; +import { logger } from "../log.js"; +import { stringifyToEnvoy, stringifyToRivet } from "../stringify.js"; +import { calculateBackoff } from "../utils.js"; +import { + type WebSocketRxMessage, + type WebSocketTxMessage, + webSocket, +} from "../websocket.js"; + +export function startConnection(ctx: SharedContext) { + spawn(() => connectionLoop(ctx)); +} + +const STABLE_CONNECTION_MS = 60_000; + +async function connectionLoop(ctx: SharedContext) { + let attempt = 0; + while (true) { + const connectedAt = Date.now(); + try { + await singleConnection(ctx); + } catch (error) { + log(ctx)?.error({ + msg: "connection failed", + error, + }); + } + + if (Date.now() - connectedAt >= STABLE_CONNECTION_MS) { + attempt = 0; + } + + const delay = calculateBackoff(attempt); + log(ctx)?.info({ + msg: "reconnecting", + attempt, + delayMs: delay, + }); + await sleep(delay); + attempt++; + } +} + +async function singleConnection(ctx: SharedContext) { + const { config } = ctx; + + const protocols = ["rivet"]; + if (config.token) protocols.push(`rivet_token.${config.token}`); + + const [wsTx, wsRx] = await webSocket({ + url: wsUrl(ctx), + protocols, + debugLatencyMs: config.debugLatencyMs, + }); + ctx.wsTx = wsTx; + + log(ctx)?.info({ + msg: "connected", + endpoint: config.endpoint, + namespace: config.namespace, + envoyKey: ctx.envoyKey, + hasToken: !!config.token, + }); + + sendEncoded(ctx, { + tag: "ToRivetInit", + val: { + envoyKey: ctx.envoyKey, + name: config.poolName, + version: config.version, + prepopulateActorNames: new Map( + Object.entries(config.prepopulateActorNames).map( + ([name, data]) => [ + name, + { metadata: JSON.stringify(data.metadata) }, + ], + ), + ), + metadata: JSON.stringify(config.metadata), + }, + }); + + try { + for await (const msg of wsRx) { + if (msg.type === "message") { + await handleWsData(ctx, msg); + } else if (msg.type === "close") { + log(ctx)?.info({ + msg: "websocket closed", + code: msg.code, + reason: msg.reason, + }); + break; + } else if (msg.type === "error") { + log(ctx)?.error({ + msg: "websocket error", + error: msg.error, + }); + break; + } + } + } finally { + ctx.wsTx = undefined; + } +} + +async function handleWsData( + ctx: SharedContext, + msg: WebSocketRxMessage & { type: "message" }, +) { + let buf: Uint8Array; + if (msg.data instanceof Blob) { + buf = new Uint8Array(await msg.data.arrayBuffer()); + } else if (Buffer.isBuffer(msg.data)) { + buf = new Uint8Array(msg.data); + } else if (msg.data instanceof ArrayBuffer) { + buf = new Uint8Array(msg.data); + } else { + throw new Error(`expected binary data, got ${typeof msg.data}`); + } + + const message = protocol.decodeToEnvoy(buf); + log(ctx)?.debug({ + msg: "received message", + data: stringifyToEnvoy(message), + }); + + forwardToEnvoy(ctx, message); +} + +function forwardToEnvoy(ctx: SharedContext, message: protocol.ToEnvoy) { + if (message.tag === "ToEnvoyPing") { + sendEncoded(ctx, { + tag: "ToRivetPong", + val: { ts: message.val.ts }, + }); + } else { + ctx.envoyTx.send({ type: "conn-message", message }); + } +} + +function sendEncoded(ctx: SharedContext, message: protocol.ToRivet) { + log(ctx)?.debug({ + msg: "sending message", + data: stringifyToRivet(message), + }); + + if (!ctx.wsTx) { + log(ctx)?.error({ + msg: "websocket not available for sending", + }); + return; + } + + const encoded = protocol.encodeToRivet(message); + ctx.wsTx.send({ type: "send", data: encoded }); +} + +function wsUrl(ctx: SharedContext) { + const wsEndpoint = ctx.config.endpoint + .replace("http://", "ws://") + .replace("https://", "wss://"); + + const baseUrl = wsEndpoint.endsWith("/") + ? wsEndpoint.slice(0, -1) + : wsEndpoint; + const parameters = [ + ["protocol_version", protocol.VERSION], + ["namespace", ctx.config.namespace], + ["envoy_key", ctx.envoyKey], + ["pool_name", ctx.config.poolName], + ]; + + return `${baseUrl}/envoys/connect?${parameters + .map(([key, value]) => `${key}=${encodeURIComponent(value)}`) + .join("&")}`; +} + +function log(ctx: SharedContext) { + if (ctx.logCached) return ctx.logCached; + + const baseLogger = ctx.config.logger ?? logger(); + if (!baseLogger) return undefined; + + ctx.logCached = baseLogger.child({ + envoyKey: ctx.envoyKey, + }); + return ctx.logCached; +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts new file mode 100644 index 0000000000..c830c37cee --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/commands.ts @@ -0,0 +1,92 @@ +import type * as protocol from "@rivetkit/engine-envoy-protocol"; +import { createActor } from "../actor.js"; +import { unreachable } from "antiox/panic"; +import type { EnvoyContext } from "./index.js"; +import { getActorEntry, log, wsSend } from "./index.js"; + +export async function handleCommands( + ctx: EnvoyContext, + commands: protocol.ToEnvoyCommands, +) { + log(ctx.shared)?.info({ + msg: "received commands", + commandCount: commands.length, + }); + + for (const commandWrapper of commands) { + const { + checkpoint, + inner: { tag, val }, + } = commandWrapper; + + if (tag === "CommandStartActor") { + const handle = createActor(ctx.shared, { + commandIdx: checkpoint.index, + actorId: checkpoint.actorId, + generation: checkpoint.generation, + config: val.config, + hibernatingRequests: val.hibernatingRequests, + }); + + let generations = ctx.actors.get(checkpoint.actorId); + if (!generations) { + generations = new Map(); + ctx.actors.set(checkpoint.actorId, generations); + } + generations.set(checkpoint.generation, { + handle, + eventHistory: [], + lastCommandIdx: checkpoint.index, + }); + } else if (tag === "CommandStopActor") { + const entry = getActorEntry( + ctx, + checkpoint.actorId, + checkpoint.generation, + ); + + if (!entry) { + log(ctx.shared)?.warn({ + msg: "received stop actor command for unknown actor", + actorId: checkpoint.actorId, + generation: checkpoint.generation, + }); + continue; + } + + entry.lastCommandIdx = checkpoint.index; + entry.handle.send({ + type: "command-stop-actor", + commandIdx: checkpoint.index, + reason: val.reason, + }); + } else { + unreachable(tag); + } + } +} + +const ACK_COMMANDS_INTERVAL_MS = 5 * 60 * 1000; +export { ACK_COMMANDS_INTERVAL_MS }; + +export function sendCommandAck(ctx: EnvoyContext) { + const lastCommandCheckpoints: protocol.ActorCheckpoint[] = []; + + for (const [actorId, generations] of ctx.actors) { + for (const [generation, entry] of generations) { + if (entry.lastCommandIdx < 0n) continue; + lastCommandCheckpoints.push({ + actorId, + generation, + index: entry.lastCommandIdx, + }); + } + } + + if (lastCommandCheckpoints.length === 0) return; + + wsSend(ctx, { + tag: "ToRivetAckCommands", + val: { lastCommandCheckpoints }, + }); +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts new file mode 100644 index 0000000000..91548eead5 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/events.ts @@ -0,0 +1,109 @@ +import type * as protocol from "@rivetkit/engine-envoy-protocol"; +import type { EnvoyContext, ToEnvoyMessage } from "./index.js"; +import { getActorEntry, log, wsSend } from "./index.js"; + +export function handleSendEvents( + ctx: EnvoyContext, + events: protocol.EventWrapper[], +) { + // Record in history per actor + for (const event of events) { + const entry = getActorEntry( + ctx, + event.checkpoint.actorId, + event.checkpoint.generation, + ); + if (entry) { + entry.eventHistory.push(event); + } + } + + // Send if connected + wsSend(ctx, { + tag: "ToRivetEvents", + val: events, + }); +} + +export function handleCommandStopActorComplete( + ctx: EnvoyContext, + msg: Extract, +) { + const event: protocol.EventWrapper = { + checkpoint: { + actorId: msg.actorId, + generation: msg.generation, + index: msg.checkpointIndex, + }, + inner: { + tag: "EventActorStateUpdate", + val: { + state: { + tag: "ActorStateStopped", + val: { + code: msg.code, + message: msg.message, + }, + }, + }, + }, + }; + + handleSendEvents(ctx, [event]); + + // Close the actor channel but keep event history for ack/resend. + // The entry is cleaned up when all events are acked. + const entry = getActorEntry(ctx, msg.actorId, msg.generation); + if (entry) { + entry.handle.close(); + } +} + +export function handleAckEvents( + ctx: EnvoyContext, + ack: protocol.ToEnvoyAckEvents, +) { + for (const checkpoint of ack.lastEventCheckpoints) { + const entry = getActorEntry( + ctx, + checkpoint.actorId, + checkpoint.generation, + ); + if (!entry) continue; + + entry.eventHistory = entry.eventHistory.filter( + (event) => event.checkpoint.index > checkpoint.index, + ); + + // Clean up fully acked stopped actors + if (entry.eventHistory.length === 0 && entry.handle.isClosed()) { + const gens = ctx.actors.get(checkpoint.actorId); + gens?.delete(checkpoint.generation); + if (gens?.size === 0) { + ctx.actors.delete(checkpoint.actorId); + } + } + } +} + +export function resendUnacknowledgedEvents(ctx: EnvoyContext) { + const events: protocol.EventWrapper[] = []; + + for (const [, generations] of ctx.actors) { + for (const [, entry] of generations) { + events.push(...entry.eventHistory); + } + } + + if (events.length === 0) return; + + log(ctx.shared)?.info({ + msg: "resending unacknowledged events", + count: events.length, + }); + + wsSend(ctx, { + tag: "ToRivetEvents", + val: events, + }); +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts new file mode 100644 index 0000000000..9763dd6859 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts @@ -0,0 +1,484 @@ +import * as protocol from "@rivetkit/engine-envoy-protocol"; +import type { UnboundedSender } from "antiox/sync/mpsc"; +import { unboundedChannel } from "antiox/sync/mpsc"; +import { v4 as uuidv4 } from "uuid"; +import type { ToActor } from "../actor.js"; +import type { EnvoyConfig } from "../../config.js"; +import type { EnvoyHandle, KvListOptions } from "../../handle.js"; +import { startConnection } from "../connection.js"; +import type { SharedContext } from "../../context.js"; +import { logger } from "../../log.js"; +import { stringifyToRivet } from "../../stringify.js"; +import { unreachable } from "antiox/panic"; +import { + ACK_COMMANDS_INTERVAL_MS, + handleCommands, + sendCommandAck, +} from "./commands.js"; +import { + handleAckEvents, + handleCommandStopActorComplete, + handleSendEvents, + resendUnacknowledgedEvents, +} from "./events.js"; +import { + KV_CLEANUP_INTERVAL_MS, + type KvRequestEntry, + cleanupOldKvRequests, + handleKvRequest, + handleKvResponse, + processUnsentKvRequests, +} from "./kv.js"; + +export interface EnvoyContext { + shared: SharedContext; + protocolMetadata?: protocol.ProtocolMetadata; + actors: Map>; + kvRequests: Map; + nextKvRequestId: number; +} + +export interface ActorEntry { + handle: UnboundedSender; + eventHistory: protocol.EventWrapper[]; + lastCommandIdx: bigint; +} + +/** + * Message coming from the connection. + * + * Ping is handled by the connection task. + */ +export type ToEnvoyConnMessage = Exclude< + protocol.ToEnvoy, + { tag: "ToEnvoyPing" } +>; + +export type ToEnvoyMessage = + // Inbound from connection + | { type: "conn-message"; message: ToEnvoyConnMessage } + // Sent from actor + | { + type: "send-events"; + events: protocol.EventWrapper[]; + } + | { + type: "command-stop-actor-complete"; + actorId: string; + generation: number; + checkpointIndex: bigint; + code: protocol.StopCode; + message: string | null; + } + | { + type: "kv-request"; + actorId: string; + data: protocol.KvRequestData; + resolve: (data: protocol.KvResponseData) => void; + reject: (error: Error) => void; + }; + +export async function startEnvoy(config: EnvoyConfig) { + const [envoyTx, envoyRx] = unboundedChannel(); + const actors: Map> = new Map(); + + const handle = createHandle(actors, envoyTx); + const shared: SharedContext = { + config, + envoyKey: uuidv4(), + envoyTx, + handle, + }; + + startConnection(shared); + + const ctx: EnvoyContext = { + shared, + actors, + kvRequests: new Map(), + nextKvRequestId: 0, + }; + + log(ctx.shared)?.info({ msg: "starting envoy" }); + + const ackInterval = setInterval(() => { + sendCommandAck(ctx); + }, ACK_COMMANDS_INTERVAL_MS); + + const kvCleanupInterval = setInterval(() => { + cleanupOldKvRequests(ctx); + }, KV_CLEANUP_INTERVAL_MS); + + for await (const msg of envoyRx) { + if (msg.type === "conn-message") { + await handleConnMessage(ctx, msg.message); + } else if (msg.type === "send-events") { + handleSendEvents(ctx, msg.events); + } else if (msg.type === "command-stop-actor-complete") { + handleCommandStopActorComplete(ctx, msg); + } else if (msg.type === "kv-request") { + handleKvRequest(ctx, msg); + } else { + unreachable(msg); + } + } + + // Cleanup + clearInterval(ackInterval); + clearInterval(kvCleanupInterval); + + for (const request of ctx.kvRequests.values()) { + request.reject(new Error("envoy shutting down")); + } + ctx.kvRequests.clear(); + + for (const [, generations] of ctx.actors) { + for (const [, entry] of generations) { + entry.handle.close(); + } + } + ctx.actors.clear(); +} + +async function handleConnMessage( + ctx: EnvoyContext, + message: ToEnvoyConnMessage, +) { + if (message.tag === "ToEnvoyInit") { + ctx.protocolMetadata = message.val.metadata; + log(ctx.shared)?.info({ + msg: "received init", + protocolMetadata: message.val.metadata, + }); + + resendUnacknowledgedEvents(ctx); + processUnsentKvRequests(ctx); + } else if (message.tag === "ToEnvoyCommands") { + await handleCommands(ctx, message.val); + } else if (message.tag === "ToEnvoyAckEvents") { + handleAckEvents(ctx, message.val); + } else if (message.tag === "ToEnvoyKvResponse") { + handleKvResponse(ctx, message.val); + } else if (message.tag === "ToEnvoyTunnelMessage") { + // TODO: + } else { + unreachable(message); + } +} + +// MARK: Util + +export function wsSend(ctx: EnvoyContext, message: protocol.ToRivet) { + log(ctx.shared)?.debug({ + msg: "sending message", + data: stringifyToRivet(message), + }); + + if (!ctx.shared.wsTx) { + log(ctx.shared)?.warn({ + msg: "websocket not available for sending, events will be resent on reconnect", + }); + return; + } + + const encoded = protocol.encodeToRivet(message); + ctx.shared.wsTx.send({ type: "send", data: encoded }); +} + +export function log(ctx: SharedContext) { + if (ctx.logCached) return ctx.logCached; + + const baseLogger = ctx.config.logger ?? logger(); + if (!baseLogger) return undefined; + + ctx.logCached = baseLogger.child({ + envoyKey: ctx.envoyKey, + }); + return ctx.logCached; +} + +export function getActorEntry( + ctx: EnvoyContext, + actorId: string, + generation: number, +): ActorEntry | undefined { + return ctx.actors.get(actorId)?.get(generation); +} + +// MARK: Handle + +function createHandle( + actors: Map>, + envoyTx: UnboundedSender, +): EnvoyHandle { + function findActor( + actorId: string, + generation?: number, + ): ActorEntry | undefined { + const gens = actors.get(actorId); + if (!gens || gens.size === 0) return undefined; + + if (generation !== undefined) { + return gens.get(generation); + } + + // Return first non-closed (active) entry + for (const entry of gens.values()) { + if (!entry.handle.isClosed()) { + return entry; + } + } + return undefined; + } + + function sendActorIntent( + actorId: string, + intent: protocol.ActorIntent, + generation?: number, + ): void { + const entry = findActor(actorId, generation); + if (!entry) return; + entry.handle.send({ + type: "actor-intent", + commandIdx: 0n, + intent, + }); + } + + function sendKvRequest( + actorId: string, + data: protocol.KvRequestData, + ): Promise { + return new Promise((resolve, reject) => { + envoyTx.send({ + type: "kv-request", + actorId, + data, + resolve, + reject, + }); + }); + } + + function toBuffer(arr: Uint8Array): ArrayBuffer { + return arr.buffer.slice( + arr.byteOffset, + arr.byteOffset + arr.byteLength, + ) as ArrayBuffer; + } + + function parseListResponse( + response: protocol.KvResponseData, + ): [Uint8Array, Uint8Array][] { + const val = ( + response as { + tag: "KvListResponse"; + val: protocol.KvListResponse; + } + ).val; + const result: [Uint8Array, Uint8Array][] = []; + for (let i = 0; i < val.keys.length; i++) { + const key = val.keys[i]; + const value = val.values[i]; + if (key && value) { + result.push([new Uint8Array(key), new Uint8Array(value)]); + } + } + return result; + } + + return { + sleepActor(actorId: string, generation?: number): void { + sendActorIntent( + actorId, + { tag: "ActorIntentSleep", val: null }, + generation, + ); + }, + + stopActor(actorId: string, generation?: number): void { + sendActorIntent( + actorId, + { tag: "ActorIntentStop", val: null }, + generation, + ); + }, + + destroyActor(actorId: string, generation?: number): void { + sendActorIntent( + actorId, + { tag: "ActorIntentStop", val: null }, + generation, + ); + }, + + setAlarm( + actorId: string, + alarmTs: number | null, + generation?: number, + ): void { + const entry = findActor(actorId, generation); + if (!entry) return; + entry.handle.send({ + type: "set-alarm", + alarmTs: alarmTs !== null ? BigInt(alarmTs) : null, + }); + }, + + async kvGet( + actorId: string, + keys: Uint8Array[], + ): Promise<(Uint8Array | null)[]> { + const kvKeys = keys.map(toBuffer); + const response = await sendKvRequest(actorId, { + tag: "KvGetRequest", + val: { keys: kvKeys }, + }); + + const val = ( + response as { + tag: "KvGetResponse"; + val: protocol.KvGetResponse; + } + ).val; + const responseKeys = val.keys.map( + (k: ArrayBuffer) => new Uint8Array(k), + ); + const responseValues = val.values.map( + (v: ArrayBuffer) => new Uint8Array(v), + ); + + const result: (Uint8Array | null)[] = []; + for (const requestedKey of keys) { + let found = false; + for (let i = 0; i < responseKeys.length; i++) { + if (uint8ArraysEqual(requestedKey, responseKeys[i])) { + result.push(responseValues[i]); + found = true; + break; + } + } + if (!found) { + result.push(null); + } + } + return result; + }, + + async kvListAll( + actorId: string, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]> { + const response = await sendKvRequest(actorId, { + tag: "KvListRequest", + val: { + query: { tag: "KvListAllQuery", val: null }, + reverse: options?.reverse ?? null, + limit: + options?.limit !== undefined + ? BigInt(options.limit) + : null, + }, + }); + return parseListResponse(response); + }, + + async kvListRange( + actorId: string, + start: Uint8Array, + end: Uint8Array, + exclusive?: boolean, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]> { + const response = await sendKvRequest(actorId, { + tag: "KvListRequest", + val: { + query: { + tag: "KvListRangeQuery", + val: { + start: toBuffer(start), + end: toBuffer(end), + exclusive: exclusive ?? false, + }, + }, + reverse: options?.reverse ?? null, + limit: + options?.limit !== undefined + ? BigInt(options.limit) + : null, + }, + }); + return parseListResponse(response); + }, + + async kvListPrefix( + actorId: string, + prefix: Uint8Array, + options?: KvListOptions, + ): Promise<[Uint8Array, Uint8Array][]> { + const response = await sendKvRequest(actorId, { + tag: "KvListRequest", + val: { + query: { + tag: "KvListPrefixQuery", + val: { key: toBuffer(prefix) }, + }, + reverse: options?.reverse ?? null, + limit: + options?.limit !== undefined + ? BigInt(options.limit) + : null, + }, + }); + return parseListResponse(response); + }, + + async kvPut( + actorId: string, + entries: [Uint8Array, Uint8Array][], + ): Promise { + const keys = entries.map(([k]) => toBuffer(k)); + const values = entries.map(([, v]) => toBuffer(v)); + await sendKvRequest(actorId, { + tag: "KvPutRequest", + val: { keys, values }, + }); + }, + + async kvDelete( + actorId: string, + keys: Uint8Array[], + ): Promise { + await sendKvRequest(actorId, { + tag: "KvDeleteRequest", + val: { keys: keys.map(toBuffer) }, + }); + }, + + async kvDeleteRange( + actorId: string, + start: Uint8Array, + end: Uint8Array, + ): Promise { + await sendKvRequest(actorId, { + tag: "KvDeleteRangeRequest", + val: { start: toBuffer(start), end: toBuffer(end) }, + }); + }, + + async kvDrop(actorId: string): Promise { + await sendKvRequest(actorId, { + tag: "KvDropRequest", + val: null, + }); + }, + }; +} + +function uint8ArraysEqual(a: Uint8Array, b: Uint8Array): boolean { + if (a.length !== b.length) return false; + for (let i = 0; i < a.length; i++) { + if (a[i] !== b[i]) return false; + } + return true; +} diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/kv.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/kv.ts new file mode 100644 index 0000000000..348edd5180 --- /dev/null +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/kv.ts @@ -0,0 +1,113 @@ +import type * as protocol from "@rivetkit/engine-envoy-protocol"; +import type { EnvoyContext, ToEnvoyMessage } from "./index.js"; +import { log, wsSend } from "./index.js"; +import { stringifyError } from "../../utils.js"; + +export interface KvRequestEntry { + actorId: string; + data: protocol.KvRequestData; + resolve: (data: protocol.KvResponseData) => void; + reject: (error: Error) => void; + sent: boolean; + timestamp: number; +} + +export const KV_EXPIRE_MS = 30_000; +export const KV_CLEANUP_INTERVAL_MS = 15_000; + +export function handleKvRequest( + ctx: EnvoyContext, + msg: Extract, +) { + const requestId = ctx.nextKvRequestId++; + + const entry: KvRequestEntry = { + actorId: msg.actorId, + data: msg.data, + resolve: msg.resolve, + reject: msg.reject, + sent: false, + timestamp: Date.now(), + }; + + ctx.kvRequests.set(requestId, entry); + + if (ctx.shared.wsTx) { + sendSingleKvRequest(ctx, requestId); + } +} + +export function handleKvResponse( + ctx: EnvoyContext, + response: protocol.ToEnvoyKvResponse, +) { + const request = ctx.kvRequests.get(response.requestId); + + if (!request) { + log(ctx.shared)?.error({ + msg: "received kv response for unknown request id", + requestId: response.requestId, + }); + return; + } + + ctx.kvRequests.delete(response.requestId); + + if (response.data.tag === "KvErrorResponse") { + request.reject( + new Error(response.data.val.message || "unknown KV error"), + ); + } else { + request.resolve(response.data); + } +} + +export function sendSingleKvRequest(ctx: EnvoyContext, requestId: number) { + const request = ctx.kvRequests.get(requestId); + if (!request || request.sent) return; + + try { + wsSend(ctx, { + tag: "ToRivetKvRequest", + val: { + actorId: request.actorId, + requestId, + data: request.data, + }, + }); + + request.sent = true; + request.timestamp = Date.now(); + } catch (error) { + ctx.kvRequests.delete(requestId); + request.reject( + error instanceof Error ? error : new Error(stringifyError(error)), + ); + } +} + +export function processUnsentKvRequests(ctx: EnvoyContext) { + if (!ctx.shared.wsTx) return; + + for (const [requestId, request] of ctx.kvRequests) { + if (!request.sent) { + sendSingleKvRequest(ctx, requestId); + } + } +} + +export function cleanupOldKvRequests(ctx: EnvoyContext) { + const expiry = Date.now() - KV_EXPIRE_MS; + const toDelete: number[] = []; + + for (const [requestId, request] of ctx.kvRequests) { + if (request.timestamp < expiry) { + request.reject(new Error("KV request timed out")); + toDelete.push(requestId); + } + } + + for (const requestId of toDelete) { + ctx.kvRequests.delete(requestId); + } +} diff --git a/engine/sdks/typescript/envoy-client/src/utils.ts b/engine/sdks/typescript/envoy-client/src/utils.ts index 948a391848..7255ae4089 100644 --- a/engine/sdks/typescript/envoy-client/src/utils.ts +++ b/engine/sdks/typescript/envoy-client/src/utils.ts @@ -3,10 +3,6 @@ import { logger } from "./log"; // 20MiB. Keep in sync with runner_max_response_payload_body_size from engine/packages/config/src/config/pegboard.rs export const MAX_PAYLOAD_SIZE = 20 * 1024 * 1024; -export function unreachable(x: never): never { - throw `Unreachable: ${x}`; -} - /** Resolves after the configured debug latency, or immediately if none. */ export function injectLatency(ms?: number): Promise { if (!ms) return Promise.resolve(); diff --git a/engine/sdks/typescript/envoy-client/src/websocket.ts b/engine/sdks/typescript/envoy-client/src/websocket.ts index 2804c77f50..cb63637271 100644 --- a/engine/sdks/typescript/envoy-client/src/websocket.ts +++ b/engine/sdks/typescript/envoy-client/src/websocket.ts @@ -1,16 +1,38 @@ -import { logger } from "./log"; +import type { UnboundedReceiver, UnboundedSender } from "antiox/sync/mpsc"; +import { OnceCell } from "antiox/sync/once_cell"; +import { spawn } from "antiox/task"; +import type WsWebSocket from "ws"; +import { latencyChannel } from "./latency-channel.js"; +import { logger } from "./log.js"; -// Global singleton promise that will be reused for subsequent calls -let webSocketPromise: Promise | null = null; +export type WebSocketTxData = Parameters[0]; -export async function importWebSocket(): Promise { - // Return existing promise if we already started loading - if (webSocketPromise !== null) { - return webSocketPromise; - } +export type WebSocketRxData = WsWebSocket.Data | Blob; + +export type WebSocketTxMessage = + | { type: "send"; data: WebSocketTxData } + | { type: "close"; code?: number; reason?: string }; + +export type WebSocketRxMessage = + | { type: "message"; data: WebSocketRxData } + | { type: "close"; code: number; reason: string } + | { type: "error"; error: Error }; - // Create and store the promise - webSocketPromise = (async () => { +export type WebSocketHandle = [ + UnboundedSender, + UnboundedReceiver, +]; + +export interface WebSocketOptions { + url: string; + protocols?: string | string[]; + debugLatencyMs?: number; +} + +const webSocketPromise = new OnceCell(); + +export async function importWebSocket(): Promise { + return webSocketPromise.getOrInit(async () => { let _WebSocket: typeof WebSocket; if (typeof WebSocket !== "undefined") { @@ -37,7 +59,65 @@ export async function importWebSocket(): Promise { } return _WebSocket; - })(); + }); +} + +export async function webSocket( + options: WebSocketOptions, +): Promise { + const { url, protocols, debugLatencyMs } = options; + const WS = await importWebSocket(); + const raw = new WS(url, protocols); + const [outboundTx, outboundRx] = + latencyChannel(debugLatencyMs); + const [inboundTx, inboundRx] = + latencyChannel(debugLatencyMs); + + raw.addEventListener("message", (event) => { + inboundTx.send({ + type: "message", + data: event.data as WebSocketRxData, + }); + }); + + raw.addEventListener("close", (event) => { + inboundTx.send({ + type: "close", + code: event.code, + reason: event.reason, + }); + inboundTx.close(); + outboundRx.close(); + }); + + raw.addEventListener("error", (event) => { + const error = + typeof event === "object" && event !== null && "error" in event + ? event.error + : new Error("WebSocket error"); + inboundTx.send({ + type: "error", + error: error instanceof Error ? error : new Error(String(error)), + }); + inboundTx.close(); + outboundRx.close(); + }); + + spawn(async () => { + for await (const message of outboundRx) { + if (message.type === "send") { + raw.send(message.data); + } else { + raw.close(message.code, message.reason); + break; + } + } + + if (raw.readyState === 0 || raw.readyState === 1) { + raw.close(); + } + inboundTx.close(); + }); - return webSocketPromise; + return [outboundTx, inboundRx]; } diff --git a/engine/sdks/typescript/envoy-client/todo.md b/engine/sdks/typescript/envoy-client/todo.md new file mode 100644 index 0000000000..cf9084a15c --- /dev/null +++ b/engine/sdks/typescript/envoy-client/todo.md @@ -0,0 +1,2 @@ +- investigate conn channel and how that behaves with reconnects for epheemral messages +- check how messages get buffered in wstx diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6a59f7abec..94a9725855 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -144,6 +144,9 @@ importers: '@rivetkit/virtual-websocket': specifier: workspace:* version: link:../../../../shared/typescript/virtual-websocket + antiox: + specifier: link:/home/nathan/antiox + version: link:../../../../../antiox pino: specifier: ^9.9.5 version: 9.9.5