From 12d05fe641396552e8a776bd45344087928edd76 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 6 Jun 2025 13:33:36 -0700 Subject: [PATCH] chore: refactor client to support `ClientDriver` --- packages/actor/src/client/actor-common.ts | 43 ---- packages/actor/src/client/actor-conn.ts | 190 ++++++----------- packages/actor/src/client/actor-handle.ts | 61 ++---- packages/actor/src/client/client.ts | 101 ++++----- .../actor/src/client/http-client-driver.ts | 198 ++++++++++++++++++ packages/actor/src/client/mod.ts | 21 +- packages/actor/src/client/utils.ts | 13 ++ packages/platforms/rivet/package.json | 1 + 8 files changed, 372 insertions(+), 256 deletions(-) create mode 100644 packages/actor/src/client/http-client-driver.ts diff --git a/packages/actor/src/client/actor-common.ts b/packages/actor/src/client/actor-common.ts index d4f8a739c..2616f656d 100644 --- a/packages/actor/src/client/actor-common.ts +++ b/packages/actor/src/client/actor-common.ts @@ -35,46 +35,3 @@ export type ActorDefinitionActions = } : never; -/** - * Resolves an actor ID from a query by making a request to the /actors/resolve endpoint - * - * @param {string} endpoint - The manager endpoint URL - * @param {ActorQuery} actorQuery - The query to resolve - * @param {Encoding} encodingKind - The encoding to use (json or cbor) - * @returns {Promise} - A promise that resolves to the actor's ID - */ -export async function resolveActorId( - endpoint: string, - actorQuery: ActorQuery, - encodingKind: Encoding, -): Promise { - logger().debug("resolving actor ID", { query: actorQuery }); - - try { - const result = await sendHttpRequest< - Record, - protoHttpResolve.ResolveResponse - >({ - url: `${endpoint}/actors/resolve`, - method: "POST", - headers: { - [HEADER_ENCODING]: encodingKind, - [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), - }, - body: {}, - encoding: encodingKind, - }); - - logger().debug("resolved actor ID", { actorId: result.i }); - return result.i; - } catch (error) { - logger().error("failed to resolve actor ID", { error }); - if (error instanceof errors.ActorError) { - throw error; - } else { - throw new errors.InternalError( - `Failed to resolve actor ID: ${String(error)}`, - ); - } - } -} diff --git a/packages/actor/src/client/actor-conn.ts b/packages/actor/src/client/actor-conn.ts index 71276df8e..9de47b400 100644 --- a/packages/actor/src/client/actor-conn.ts +++ b/packages/actor/src/client/actor-conn.ts @@ -11,10 +11,15 @@ import { importWebSocket } from "@/common/websocket"; import type { ActorQuery } from "@/manager/protocol/query"; import * as cbor from "cbor-x"; import pRetry from "p-retry"; -import { ACTOR_CONNS_SYMBOL, type ClientRaw, TRANSPORT_SYMBOL } from "./client"; +import { + ACTOR_CONNS_SYMBOL, + ClientDriver, + type ClientRaw, + TRANSPORT_SYMBOL, +} from "./client"; import * as errors from "./errors"; import { logger } from "./log"; -import { type WebSocketMessage as ConnMessage, messageLength } from "./utils"; +import { type WebSocketMessage as ConnMessage, messageLength, serializeWithEncoding } from "./utils"; import { HEADER_ACTOR_ID, HEADER_ACTOR_QUERY, @@ -51,7 +56,7 @@ export type EventUnsubscribe = () => void; */ export type ActorErrorCallback = (error: errors.ActorError) => void; -interface SendOpts { +export interface SendHttpMessageOpts { ephemeral: boolean; } @@ -59,11 +64,6 @@ export type ConnTransport = { websocket: WebSocket } | { sse: EventSource }; export const CONNECT_SYMBOL = Symbol("connect"); -interface DynamicImports { - WebSocket: typeof WebSocket; - EventSource: typeof EventSource; -} - /** * Provides underlying functions for {@link ActorConn}. See {@link ActorConn} for using type-safe remote procedure calls. * @@ -102,46 +102,38 @@ export class ActorConnRaw { */ #keepNodeAliveInterval: NodeJS.Timeout; - /** Promise used to indicate the required properties for using this class have loaded. Currently just #dynamicImports */ - #onConstructedPromise: Promise; - /** Promise used to indicate the socket has connected successfully. This will be rejected if the connection fails. */ #onOpenPromise?: PromiseWithResolvers; - // TODO: ws message queue + #client: ClientRaw; + #driver: ClientDriver; + #params: unknown; + #encodingKind: Encoding; + #actorQuery: ActorQuery; - // External imports - #dynamicImports!: DynamicImports; + // TODO: ws message queue /** * Do not call this directly. * * Creates an instance of ActorConnRaw. * - * @param {string} endpoint - The endpoint to connect to. - * * @protected */ public constructor( - private readonly client: ClientRaw, - private readonly endpoint: string, - private readonly params: unknown, - private readonly encodingKind: Encoding, - private readonly actorQuery: ActorQuery, + private client: ClientRaw, + private driver: ClientDriver, + private params: unknown, + private encodingKind: Encoding, + private actorQuery: ActorQuery, ) { - this.#keepNodeAliveInterval = setInterval(() => 60_000); + this.#client = client; + this.#driver = driver; + this.#params = params; + this.#encodingKind = encodingKind; + this.#actorQuery = actorQuery; - this.#onConstructedPromise = (async () => { - // Import dynamic dependencies - const [WebSocket, EventSource] = await Promise.all([ - importWebSocket(), - importEventSource(), - ]); - this.#dynamicImports = { - WebSocket, - EventSource, - }; - })(); + this.#keepNodeAliveInterval = setInterval(() => 60_000); } /** @@ -158,8 +150,6 @@ export class ActorConnRaw { name: string, ...args: Args ): Promise { - await this.#onConstructedPromise; - logger().debug("action", { name, args }); // If we have an active connection, use the websockactionId @@ -238,20 +228,18 @@ enc async #connectAndWait() { try { - await this.#onConstructedPromise; - // Create promise for open if (this.#onOpenPromise) throw new Error("#onOpenPromise already defined"); this.#onOpenPromise = Promise.withResolvers(); // Connect transport - if (this.client[TRANSPORT_SYMBOL] === "websocket") { - this.#connectWebSocket(); - } else if (this.client[TRANSPORT_SYMBOL] === "sse") { - this.#connectSse(); + if (this.#client[TRANSPORT_SYMBOL] === "websocket") { + await this.#connectWebSocket(); + } else if (this.#client[TRANSPORT_SYMBOL] === "sse") { + await this.#connectSse(); } else { - assertUnreachable(this.client[TRANSPORT_SYMBOL]); + assertUnreachable(this.#client[TRANSPORT_SYMBOL]); } // Wait for result @@ -261,27 +249,11 @@ enc } } - #connectWebSocket() { - const { WebSocket } = this.#dynamicImports; - - const actorQueryStr = encodeURIComponent(JSON.stringify(this.actorQuery)); - const endpoint = this.endpoint - .replace(/^http:/, "ws:") - .replace(/^https:/, "wss:"); - const url = `${endpoint}/actors/connect/websocket?encoding=${this.encodingKind}&query=${actorQueryStr}`; - - logger().debug("connecting to websocket", { url }); - const ws = new WebSocket(url); - if (this.encodingKind === "cbor") { - ws.binaryType = "arraybuffer"; - } else if (this.encodingKind === "json") { - // HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005 - try { - ws.binaryType = "blob"; - } catch (error) {} - } else { - assertUnreachable(this.encodingKind); - } + async #connectWebSocket() { + const ws = await this.#driver.connectWebSocket( + this.#actorQuery, + this.#encodingKind, + ); this.#transport = { websocket: ws }; ws.onopen = () => { logger().debug("websocket open"); @@ -289,7 +261,7 @@ enc // Set init message this.#sendMessage( { - b: { i: { p: this.params } }, + b: { i: { p: this.#params } }, }, { ephemeral: true }, ); @@ -307,28 +279,12 @@ enc }; } - #connectSse() { - const { EventSource } = this.#dynamicImports; - - const url = `${this.endpoint}/actors/connect/sse`; - - logger().debug("connecting to sse", { url }); - const eventSource = new EventSource(url, { - fetch: (input, init) => { - return fetch(input, { - ...init, - headers: { - ...init?.headers, - "User-Agent": httpUserAgent(), - [HEADER_ENCODING]: this.encodingKind, - [HEADER_ACTOR_QUERY]: JSON.stringify(this.actorQuery), - ...(this.params !== undefined - ? { [HEADER_CONN_PARAMS]: JSON.stringify(this.params) } - : {}), - }, - }); - }, - }); + async #connectSse() { + const eventSource = await this.#driver.connectSse( + this.#actorQuery, + this.#encodingKind, + this.#params, + ); this.#transport = { sse: eventSource }; eventSource.onopen = () => { logger().debug("eventsource open"); @@ -338,7 +294,7 @@ enc this.#handleOnMessage(ev); }; eventSource.onerror = (ev) => { - if (eventSource.readyState === EventSource.CLOSED) { + if (eventSource.readyState === eventSource.CLOSED) { // This error indicates a close event this.#handleOnClose(ev); } else { @@ -635,7 +591,7 @@ enc }; } - #sendMessage(message: wsToServer.ToServer, opts?: SendOpts) { + #sendMessage(message: wsToServer.ToServer, opts?: SendHttpMessageOpts) { if (this.#disposed) { throw new errors.ActorConnDisposed(); } @@ -645,10 +601,12 @@ enc // No transport connected yet queueMessage = true; } else if ("websocket" in this.#transport) { - const { WebSocket } = this.#dynamicImports; - if (this.#transport.websocket.readyState === WebSocket.OPEN) { + if (this.#transport.websocket.readyState === 1) { try { - const messageSerialized = this.#serialize(message); + const messageSerialized = serializeWithEncoding( + this.#encodingKind, + message, + ); this.#transport.websocket.send(messageSerialized); logger().debug("sent websocket message", { message: message, @@ -666,9 +624,7 @@ enc queueMessage = true; } } else if ("sse" in this.#transport) { - const { EventSource } = this.#dynamicImports; - - if (this.#transport.sse.readyState === EventSource.OPEN) { + if (this.#transport.sse.readyState === 1) { // Spawn in background since #sendMessage cannot be async this.#sendHttpMessage(message, opts); } else { @@ -684,25 +640,21 @@ enc } } - async #sendHttpMessage(message: wsToServer.ToServer, opts?: SendOpts) { + async #sendHttpMessage( + message: wsToServer.ToServer, + opts?: SendHttpMessageOpts, + ) { try { if (!this.#actorId || !this.#connectionId || !this.#connectionToken) throw new errors.InternalError("Missing connection ID or token."); - // TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently. - // TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests - const messageSerialized = this.#serialize(message); - const res = await fetch(`${this.endpoint}/actors/message`, { - method: "POST", - headers: { - "User-Agent": httpUserAgent(), - [HEADER_ENCODING]: this.encodingKind, - [HEADER_ACTOR_ID]: this.#actorId, - [HEADER_CONN_ID]: this.#connectionId, - [HEADER_CONN_TOKEN]: this.#connectionToken, - }, - body: messageSerialized, - }); + const res = await this.#driver.sendHttpMessage( + this.#actorId, + this.#encodingKind, + this.#connectionId, + this.#connectionToken, + message, + ); if (!res.ok) { throw new errors.InternalError( @@ -729,12 +681,12 @@ enc } async #parse(data: ConnMessage): Promise { - if (this.encodingKind === "json") { + if (this.#encodingKind === "json") { if (typeof data !== "string") { throw new Error("received non-string for json parse"); } return JSON.parse(data); - } else if (this.encodingKind === "cbor") { + } else if (this.#encodingKind === "cbor") { if (!this.#transport) { // Do thing throw new Error("Cannot parse message when no transport defined"); @@ -769,17 +721,7 @@ enc ); } } else { - assertUnreachable(this.encodingKind); - } - } - - #serialize(value: unknown): ConnMessage { - if (this.encodingKind === "json") { - return JSON.stringify(value); - } else if (this.encodingKind === "cbor") { - return cbor.encode(value); - } else { - assertUnreachable(this.encodingKind); + assertUnreachable(this.#encodingKind); } } @@ -789,8 +731,6 @@ enc * @returns {Promise} A promise that resolves when the socket is gracefully closed. */ async dispose(): Promise { - await this.#onConstructedPromise; - // Internally, this "disposes" the connection if (this.#disposed) { @@ -808,7 +748,7 @@ enc this.#abortController.abort(); // Remove from registry - this.client[ACTOR_CONNS_SYMBOL].delete(this); + this.#client[ACTOR_CONNS_SYMBOL].delete(this); // Disconnect transport cleanly if (!this.#transport) { diff --git a/packages/actor/src/client/actor-handle.ts b/packages/actor/src/client/actor-handle.ts index 0cb3bbb7e..c1c458808 100644 --- a/packages/actor/src/client/actor-handle.ts +++ b/packages/actor/src/client/actor-handle.ts @@ -1,19 +1,16 @@ import type { AnyActorDefinition } from "@/actor/definition"; -import type { ActionRequest, ActionResponse } from "@/actor/protocol/http/action"; import type { Encoding } from "@/actor/protocol/serde"; import type { ActorQuery } from "@/manager/protocol/query"; -import { type ActorDefinitionActions, resolveActorId } from "./actor-common"; +import { type ActorDefinitionActions } from "./actor-common"; import { type ActorConn, ActorConnRaw } from "./actor-conn"; -import { CREATE_ACTOR_CONN_PROXY, type ClientRaw } from "./client"; +import { + ClientDriver, + CREATE_ACTOR_CONN_PROXY, + type ClientRaw, +} from "./client"; import { logger } from "./log"; -import { sendHttpRequest } from "./utils"; import invariant from "invariant"; import { assertUnreachable } from "@/actor/utils"; -import { - HEADER_ACTOR_QUERY, - HEADER_CONN_PARAMS, - HEADER_ENCODING, -} from "@/actor/router-endpoints"; /** * Provides underlying functions for stateless {@link ActorHandle} for action calls. @@ -23,30 +20,30 @@ import { */ export class ActorHandleRaw { #client: ClientRaw; - #endpoint: string; + #driver: ClientDriver; #encodingKind: Encoding; #actorQuery: ActorQuery; + #params: unknown; /** * Do not call this directly. * * Creates an instance of ActorHandleRaw. * - * @param {string} endpoint - The endpoint to connect to. - * * @protected */ public constructor( client: any, - endpoint: string, - private readonly params: unknown, + driver: ClientDriver, + params: unknown, encodingKind: Encoding, actorQuery: ActorQuery, ) { this.#client = client; - this.#endpoint = endpoint; + this.#driver = driver; this.#encodingKind = encodingKind; this.#actorQuery = actorQuery; + this.#params = params; } /** @@ -63,27 +60,13 @@ export class ActorHandleRaw { name: string, ...args: Args ): Promise { - logger().debug("actor handle action", { + return await this.#driver.action( + this.#actorQuery, + this.#encodingKind, + this.#params, name, - args, - query: this.#actorQuery, - }); - - const responseData = await sendHttpRequest({ - url: `${this.#endpoint}/actors/actions/${encodeURIComponent(name)}`, - method: "POST", - headers: { - [HEADER_ENCODING]: this.#encodingKind, - [HEADER_ACTOR_QUERY]: JSON.stringify(this.#actorQuery), - ...(this.params !== undefined - ? { [HEADER_CONN_PARAMS]: JSON.stringify(this.params) } - : {}), - }, - body: { a: args } satisfies ActionRequest, - encoding: this.#encodingKind, - }); - - return responseData.o as Response; + ...args, + ); } /** @@ -99,8 +82,8 @@ export class ActorHandleRaw { const conn = new ActorConnRaw( this.#client, - this.#endpoint, - this.params, + this.#driver, + this.#params, this.#encodingKind, this.#actorQuery, ); @@ -120,8 +103,8 @@ export class ActorHandleRaw { "getForKey" in this.#actorQuery || "getOrCreateForKey" in this.#actorQuery ) { - const actorId = await resolveActorId( - this.#endpoint, + // TODO: + const actorId = await this.#driver.resolveActorId( this.#actorQuery, this.#encodingKind, ); diff --git a/packages/actor/src/client/client.ts b/packages/actor/src/client/client.ts index 674815164..91145fdb6 100644 --- a/packages/actor/src/client/client.ts +++ b/packages/actor/src/client/client.ts @@ -1,13 +1,20 @@ import type { Transport } from "@/actor/protocol/message/mod"; import type { Encoding } from "@/actor/protocol/serde"; import type { ActorQuery } from "@/manager/protocol/query"; -import * as errors from "./errors"; -import { ActorConn, ActorConnRaw, CONNECT_SYMBOL } from "./actor-conn"; +import { + ActorConn, + ActorConnRaw, + CONNECT_SYMBOL, + SendHttpMessageOpts, +} from "./actor-conn"; import { ActorHandle, ActorHandleRaw } from "./actor-handle"; -import { ActorActionFunction, resolveActorId } from "./actor-common"; +import { ActorActionFunction } from "./actor-common"; import { logger } from "./log"; import type { ActorCoreApp } from "@/mod"; import type { AnyActorDefinition } from "@/actor/definition"; +import type * as wsToServer from "@/actor/protocol/message/to-server"; +import type { EventSource } from "eventsource"; +import { createHttpClientDriver } from "./http-client-driver"; /** Extract the actor registry from the app definition. */ export type ExtractActorsFromApp> = @@ -150,6 +157,36 @@ export const ACTOR_CONNS_SYMBOL = Symbol("actorConns"); export const CREATE_ACTOR_CONN_PROXY = Symbol("createActorConnProxy"); export const TRANSPORT_SYMBOL = Symbol("transport"); +export interface ClientDriver { + action = unknown[], Response = unknown>( + actorQuery: ActorQuery, + encoding: Encoding, + params: unknown, + name: string, + ...args: Args + ): Promise; + resolveActorId( + actorQuery: ActorQuery, + encodingKind: Encoding, + ): Promise; + connectWebSocket( + actorQuery: ActorQuery, + encodingKind: Encoding, + ): Promise; + connectSse( + actorQuery: ActorQuery, + encodingKind: Encoding, + params: unknown, + ): Promise; + sendHttpMessage( + actorId: string, + encoding: Encoding, + connectionId: string, + connectionToken: string, + message: wsToServer.ToServer, + ): Promise; +} + /** * Client for managing & connecting to actors. * @@ -161,7 +198,7 @@ export class ClientRaw { [ACTOR_CONNS_SYMBOL] = new Set(); - #managerEndpoint: string; + #driver: ClientDriver; #encodingKind: Encoding; [TRANSPORT_SYMBOL]: Transport; @@ -172,8 +209,8 @@ export class ClientRaw { * @param {ClientOptions} [opts] - Options for configuring the client. * @see {@link https://rivet.gg/docs/setup|Initial Setup} */ - public constructor(managerEndpoint: string, opts?: ClientOptions) { - this.#managerEndpoint = managerEndpoint; + public constructor(driver: ClientDriver, opts?: ClientOptions) { + this.#driver = driver; this.#encodingKind = opts?.encoding ?? "cbor"; this[TRANSPORT_SYMBOL] = opts?.transport ?? "websocket"; @@ -205,12 +242,7 @@ export class ClientRaw { }, }; - const managerEndpoint = this.#managerEndpoint; - const handle = this.#createHandle( - managerEndpoint, - opts?.params, - actorQuery, - ); + const handle = this.#createHandle(opts?.params, actorQuery); return createActorProxy(handle) as ActorHandle; } @@ -244,12 +276,7 @@ export class ClientRaw { }, }; - const managerEndpoint = this.#managerEndpoint; - const handle = this.#createHandle( - managerEndpoint, - opts?.params, - actorQuery, - ); + const handle = this.#createHandle(opts?.params, actorQuery); return createActorProxy(handle) as ActorHandle; } @@ -286,12 +313,7 @@ export class ClientRaw { }, }; - const managerEndpoint = this.#managerEndpoint; - const handle = this.#createHandle( - managerEndpoint, - opts?.params, - actorQuery, - ); + const handle = this.#createHandle(opts?.params, actorQuery); return createActorProxy(handle) as ActorHandle; } @@ -330,8 +352,7 @@ export class ClientRaw { }); // Create the actor - const actorId = await resolveActorId( - this.#managerEndpoint, + const actorId = await this.#driver.resolveActorId( createQuery, this.#encodingKind, ); @@ -347,25 +368,17 @@ export class ClientRaw { actorId, }, } satisfies ActorQuery; - const handle = this.#createHandle( - this.#managerEndpoint, - opts?.params, - getForIdQuery, - ); + const handle = this.#createHandle(opts?.params, getForIdQuery); const proxy = createActorProxy(handle) as ActorHandle; return proxy; } - #createHandle( - endpoint: string, - params: unknown, - actorQuery: ActorQuery, - ): ActorHandleRaw { + #createHandle(params: unknown, actorQuery: ActorQuery): ActorHandleRaw { return new ActorHandleRaw( this, - endpoint, + this.#driver, params, this.#encodingKind, actorQuery, @@ -421,19 +434,11 @@ export type Client> = ClientRaw & { >; }; -/** - * Creates a client with the actor accessor proxy. - * - * @template A The actor application type. - * @param {string} managerEndpoint - The manager endpoint. - * @param {ClientOptions} [opts] - Options for configuring the client. - * @returns {Client} - A proxied client that supports the `client.myActor.connect()` syntax. - */ -export function createClient>( - managerEndpoint: string, +export function createClientWithDriver>( + driver: ClientDriver, opts?: ClientOptions, ): Client { - const client = new ClientRaw(managerEndpoint, opts); + const client = new ClientRaw(driver, opts); // Create proxy for accessing actors by name return new Proxy(client, { diff --git a/packages/actor/src/client/http-client-driver.ts b/packages/actor/src/client/http-client-driver.ts new file mode 100644 index 000000000..94cdb0c36 --- /dev/null +++ b/packages/actor/src/client/http-client-driver.ts @@ -0,0 +1,198 @@ +import * as cbor from "cbor-x"; +import type { Encoding } from "@/actor/protocol/serde"; +import type { ActorQuery } from "@/manager/protocol/query"; +import * as errors from "./errors"; +import { logger } from "./log"; +import type * as wsToServer from "@/actor/protocol/message/to-server"; +import type * as protoHttpResolve from "@/actor/protocol/http/resolve"; +import { assertUnreachable, httpUserAgent } from "@/utils"; +import { + HEADER_ACTOR_ID, + HEADER_ACTOR_QUERY, + HEADER_CONN_ID, + HEADER_CONN_PARAMS, + HEADER_CONN_TOKEN, + HEADER_ENCODING, +} from "@/actor/router-endpoints"; +import type { EventSource } from "eventsource"; +import { importWebSocket } from "@/common/websocket"; +import { importEventSource } from "@/common/eventsource"; +import { + sendHttpRequest, + serializeWithEncoding, + type WebSocketMessage, +} from "./utils"; +import type { ActionRequest } from "@/actor/protocol/http/action"; +import type { ActionResponse } from "@/actor/protocol/message/to-client"; +import { ClientDriver } from "./client"; + +/** + * Client driver that communicates with the manager via HTTP. + */ +export function createHttpClientDriver(managerEndpoint: string): ClientDriver { + // Lazily import the dynamic imports so we don't have to turn `createClient` in to an aysnc fn + const dynamicImports = (async () => { + // Import dynamic dependencies + const [WebSocket, EventSource] = await Promise.all([ + importWebSocket(), + importEventSource(), + ]); + return { + WebSocket, + EventSource, + }; + })(); + + const driver: ClientDriver = { + action: async = unknown[], Response = unknown>( + actorQuery: ActorQuery, + encoding: Encoding, + params: unknown, + name: string, + ...args: Args + ): Promise => { + logger().debug("actor handle action", { + name, + args, + query: actorQuery, + }); + + const responseData = await sendHttpRequest( + { + url: `${managerEndpoint}/actors/actions/${encodeURIComponent(name)}`, + method: "POST", + headers: { + [HEADER_ENCODING]: encoding, + [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), + ...(params !== undefined + ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } + : {}), + }, + body: { a: args } satisfies ActionRequest, + encoding: encoding, + }, + ); + + return responseData.o as Response; + }, + + resolveActorId: async ( + actorQuery: ActorQuery, + encodingKind: Encoding, + ): Promise => { + logger().debug("resolving actor ID", { query: actorQuery }); + + try { + const result = await sendHttpRequest< + Record, + protoHttpResolve.ResolveResponse + >({ + url: `${managerEndpoint}/actors/resolve`, + method: "POST", + headers: { + [HEADER_ENCODING]: encodingKind, + [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), + }, + body: {}, + encoding: encodingKind, + }); + + logger().debug("resolved actor ID", { actorId: result.i }); + return result.i; + } catch (error) { + logger().error("failed to resolve actor ID", { error }); + if (error instanceof errors.ActorError) { + throw error; + } else { + throw new errors.InternalError( + `Failed to resolve actor ID: ${String(error)}`, + ); + } + } + }, + + connectWebSocket: async ( + actorQuery: ActorQuery, + encodingKind: Encoding, + ): Promise => { + const { WebSocket } = await dynamicImports; + + const actorQueryStr = encodeURIComponent(JSON.stringify(actorQuery)); + const endpoint = managerEndpoint + .replace(/^http:/, "ws:") + .replace(/^https:/, "wss:"); + const url = `${endpoint}/actors/connect/websocket?encoding=${encodingKind}&query=${actorQueryStr}`; + + logger().debug("connecting to websocket", { url }); + const ws = new WebSocket(url); + if (encodingKind === "cbor") { + ws.binaryType = "arraybuffer"; + } else if (encodingKind === "json") { + // HACK: Bun bug prevents changing binary type, so we ignore the error https://github.com/oven-sh/bun/issues/17005 + try { + ws.binaryType = "blob"; + } catch (error) {} + } else { + assertUnreachable(encodingKind); + } + + return ws; + }, + + connectSse: async ( + actorQuery: ActorQuery, + encodingKind: Encoding, + params: unknown, + ): Promise => { + const { EventSource } = await dynamicImports; + + const url = `${managerEndpoint}/actors/connect/sse`; + + logger().debug("connecting to sse", { url }); + const eventSource = new EventSource(url, { + fetch: (input, init) => { + return fetch(input, { + ...init, + headers: { + ...init?.headers, + "User-Agent": httpUserAgent(), + [HEADER_ENCODING]: encodingKind, + [HEADER_ACTOR_QUERY]: JSON.stringify(actorQuery), + ...(params !== undefined + ? { [HEADER_CONN_PARAMS]: JSON.stringify(params) } + : {}), + }, + }); + }, + }); + + return eventSource; + }, + + sendHttpMessage: async ( + actorId: string, + encoding: Encoding, + connectionId: string, + connectionToken: string, + message: wsToServer.ToServer, + ): Promise => { + // TODO: Implement ordered messages, this is not guaranteed order. Needs to use an index in order to ensure we can pipeline requests efficiently. + // TODO: Validate that we're using HTTP/3 whenever possible for pipelining requests + const messageSerialized = serializeWithEncoding(encoding, message); + const res = await fetch(`${managerEndpoint}/actors/message`, { + method: "POST", + headers: { + "User-Agent": httpUserAgent(), + [HEADER_ENCODING]: encoding, + [HEADER_ACTOR_ID]: actorId, + [HEADER_CONN_ID]: connectionId, + [HEADER_CONN_TOKEN]: connectionToken, + }, + body: messageSerialized, + }); + return res; + }, + }; + + return driver; +} diff --git a/packages/actor/src/client/mod.ts b/packages/actor/src/client/mod.ts index c94978d31..584c2c9bb 100644 --- a/packages/actor/src/client/mod.ts +++ b/packages/actor/src/client/mod.ts @@ -1,4 +1,7 @@ -export { createClient } from "./client"; +import type { ActorCoreApp } from "@/app/mod"; +import { type Client, type ClientOptions, createClientWithDriver } from "./client"; +import { createHttpClientDriver } from "./http-client-driver"; + export type { Client, ActorAccessor, @@ -33,3 +36,19 @@ export { AnyActorDefinition, ActorDefinition, } from "@/actor/definition"; + +/** + * Creates a client with the actor accessor proxy. + * + * @template A The actor application type. + * @param {string} managerEndpoint - The manager endpoint. + * @param {ClientOptions} [opts] - Options for configuring the client. + * @returns {Client} - A proxied client that supports the `client.myActor.connect()` syntax. + */ +export function createClient>( + endpoint: string, + opts?: ClientOptions, +): Client { + const driver = createHttpClientDriver(endpoint); + return createClientWithDriver(driver, opts); +} diff --git a/packages/actor/src/client/utils.ts b/packages/actor/src/client/utils.ts index 70a40e34b..915032143 100644 --- a/packages/actor/src/client/utils.ts +++ b/packages/actor/src/client/utils.ts @@ -139,3 +139,16 @@ export async function sendHttpRequest< return responseBody; } + +export function serializeWithEncoding( + encoding: Encoding, + value: unknown, +): WebSocketMessage { + if (encoding === "json") { + return JSON.stringify(value); + } else if (encoding === "cbor") { + return cbor.encode(value); + } else { + assertUnreachable(encoding); + } +} diff --git a/packages/platforms/rivet/package.json b/packages/platforms/rivet/package.json index d8651307e..fff34c480 100644 --- a/packages/platforms/rivet/package.json +++ b/packages/platforms/rivet/package.json @@ -31,6 +31,7 @@ }, "devDependencies": { "@rivet-gg/actor-core": "^25.1.0", + "@rivetkit/actor": "workspace:*", "@types/deno": "^2.0.0", "@types/invariant": "^2", "@types/node": "^22.13.1",