diff --git a/package-lock.json b/package-lock.json index bb0840f0..a0499e26 100644 --- a/package-lock.json +++ b/package-lock.json @@ -96,7 +96,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1677,7 +1676,6 @@ "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.28.6.tgz", "integrity": "sha512-05WQkdpL9COIMz4LjTxGpPNCdlpyimKppYNoJ5Di5EUObifl8t4tuLuUBBZEpoLYOmfvIWrsp9fCl0HoPRVTdA==", "license": "MIT", - "peer": true, "engines": { "node": ">=6.9.0" } @@ -3165,7 +3163,6 @@ "integrity": "sha512-DhGl4xMVFGVIyMwswXeyzdL4uXD5OGILGX5N8Y+f6W7LhC1Ze2poSNrkF/fedpVDHEEZ+PHFW0vL14I+mm8K3Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@octokit/auth-token": "^6.0.0", "@octokit/graphql": "^9.0.3", @@ -4411,7 +4408,6 @@ "integrity": "sha512-4K3bqJpXpqfg2XKGK9bpDTc6xO/xoUP/RBWS7AtRMug6zZFaRekiLzjVtAoZMquxoAbzBvy5nxQ7veS5eYzf8A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~7.18.0" } @@ -4492,7 +4488,6 @@ "integrity": "sha512-IgSWvLobTDOjnaxAfDTIHaECbkNlAlKv2j5SjpB2v7QHKv1FIfjwMy8FsDbVfDX/KjmCmYICcw7uGaXLhtsLNg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.56.0", "@typescript-eslint/types": "8.56.0", @@ -5151,7 +5146,6 @@ "integrity": "sha512-UVJyE9MttOsBQIDKw1skb9nAwQuR5wuGD3+82K6JgJlm/Y+KI92oNsMNGZCYdDsVtRHSak0pcV5Dno5+4jh9sw==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -5696,7 +5690,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -6866,7 +6859,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -6927,7 +6919,6 @@ "integrity": "sha512-82GZUjRS0p/jganf6q1rEO25VSoHH0hKPCTrgillPjdI/3bgBhAE1QzHrHTizjpRvy6pGAvKjDJtk2pF9NDq8w==", "dev": true, "license": "MIT", - "peer": true, "bin": { "eslint-config-prettier": "bin/cli.js" }, @@ -7054,7 +7045,6 @@ "integrity": "sha512-whOE1HFo/qJDyX4SnXzP4N6zOWn79WhnCUY/iDR0mPfQZO8wcYE4JClzI2oZrhBnnMUCBCHZhO6VQyoBU95mZA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@rtsao/scc": "^1.1.0", "array-includes": "^3.1.9", @@ -9829,7 +9819,6 @@ "integrity": "sha512-E3ZJh4J3S9KfwdjZhe2afj6R9lGIN5Pher1pF39UGrXRqq/VDaGVIGN13BjHd2u8B61hArAGOnso7nBOouW3TQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/parser": "^7.29.0", "@babel/types": "^7.29.0", @@ -10860,7 +10849,6 @@ "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -11389,7 +11377,6 @@ "integrity": "sha512-oQL6lgK3e2QZeQ7gcgIkS2YZPg5slw37hYufJ3edKlfQSGGm8ICoxswK15ntSzF/a8+h7ekRy7k7oWc3BQ7y8A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -12446,7 +12433,6 @@ "integrity": "sha512-5C1sg4USs1lfG0GFb2RLXsdpXqBSEhAaA/0kPL01wxzpMqLILNxIxIOKiILz+cdg/pLnOUxFYOR5yhHU666wbw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "~0.27.0", "get-tsconfig": "^4.7.5" @@ -12580,7 +12566,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -12707,7 +12692,6 @@ "dev": true, "hasInstallScript": true, "license": "MIT", - "peer": true, "dependencies": { "napi-postinstall": "^0.3.0" }, @@ -12831,7 +12815,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -12907,7 +12890,6 @@ "integrity": "sha512-hOQuK7h0FGKgBAas7v0mSAsnvrIgAvWmRFjmzpJ7SwFHH3g1k2u37JtYwOwmEKhK6ZO3v9ggDBBm0La1LCK4uQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/expect": "4.0.18", "@vitest/mocker": "4.0.18", diff --git a/src/jrpc/v2/compatibility-utils.ts b/src/jrpc/v2/compatibility-utils.ts index b4e289b3..0a00c809 100644 --- a/src/jrpc/v2/compatibility-utils.ts +++ b/src/jrpc/v2/compatibility-utils.ts @@ -97,6 +97,23 @@ export function propagateToRequest(req: Record, context: Middle }); } +/** + * Copies non-JSON-RPC string properties from the context to the request. + * + * Clone the original request object and propagate the context to the cloned request. + * + * **ATTN:** Only string properties are copied. + * + * @param req - The request to propagate the context to. + * @param context - The context to propagate from. + * @returns The mutable cloned request. + */ +export function propagateToMutableRequest(req: Record, context: MiddlewareContext): Record { + const clonedRequest = deepClone(req); + propagateToRequest(clonedRequest, context); + return clonedRequest; +} + /** * Deserialize the error property for a thrown error, merging in the cause where possible. * diff --git a/src/jrpc/v2/index.ts b/src/jrpc/v2/index.ts index 242c9d0c..0c6c8460 100644 --- a/src/jrpc/v2/index.ts +++ b/src/jrpc/v2/index.ts @@ -1,9 +1,16 @@ export { getUniqueId, isNotification, isRequest } from "../../utils/jrpc"; export { asLegacyMiddleware } from "./asLegacyMiddleware"; +export { deepClone, fromLegacyRequest, makeContext, propagateToContext, propagateToMutableRequest, propagateToRequest } from "./compatibility-utils"; export { createScaffoldMiddleware as createScaffoldMiddlewareV2 } from "./createScaffoldMiddleware"; export { JRPCEngineV2 } from "./jrpcEngineV2"; export { JRPCServer } from "./jrpcServer"; +export { createEngineStreamV2 } from "./messageStream"; export { MiddlewareContext } from "./MiddlewareContext"; +export { + providerAsMiddleware as providerAsMiddlewareV2, + providerFromEngine as providerFromEngineV2, + providerFromMiddleware as providerFromMiddlewareV2, +} from "./providerUtils"; export type { ContextConstraint, EmptyContext, diff --git a/src/jrpc/v2/messageStream.ts b/src/jrpc/v2/messageStream.ts new file mode 100644 index 00000000..296e96a5 --- /dev/null +++ b/src/jrpc/v2/messageStream.ts @@ -0,0 +1,70 @@ +import log from "loglevel"; +import { Duplex } from "readable-stream"; + +import { isRequest } from "../../utils/jrpc"; +import { rpcErrors } from "../errors"; +import { JRPCRequest } from "../interfaces"; +import { SafeEventEmitter } from "../safeEventEmitter"; +import { JRPCEngineV2 } from "./jrpcEngineV2"; + +/** + * Creates a Duplex object stream for an engine (JRPCEngineV2) + a separate notification emitter. + * + * Replaces V1's createEngineStream by decoupling notification forwarding from + * the engine itself. Notifications are routed through a SafeEventEmitter that + * pushes onto the same stream, so the engine no longer needs to be an EventEmitter. + */ +export function createEngineStreamV2({ engine, notificationEmitter }: { engine: JRPCEngineV2; notificationEmitter?: SafeEventEmitter }): Duplex { + let stream: Duplex | undefined = undefined; + + function noop() { + // noop + } + + function handleRequest(req: JRPCRequest) { + return engine + .handle(req) + .then((res): undefined => { + if (res !== undefined && isRequest(req)) { + stream?.push({ + id: req.id, + jsonrpc: "2.0", + result: res, + }); + } + return undefined; + }) + .catch((err: unknown) => { + if (isRequest(req)) { + const message = err instanceof Error ? err.message : "Internal JSON-RPC error"; + stream?.push({ + id: req.id, + jsonrpc: "2.0", + error: rpcErrors.internal({ message }), + }); + } + log.error(err); + }); + } + + function write(req: JRPCRequest, _encoding: BufferEncoding, cb: (error?: Error | null) => void) { + return handleRequest(req).finally(() => { + cb(); + }); + } + + stream = new Duplex({ objectMode: true, read: noop, write }); + + if (notificationEmitter) { + const onNotification = (message: unknown) => { + stream?.push(message); + }; + + notificationEmitter.on("notification", onNotification); + stream?.once("close", () => { + notificationEmitter.removeListener("notification", onNotification); + }); + } + + return stream; +} diff --git a/src/jrpc/v2/providerUtils.ts b/src/jrpc/v2/providerUtils.ts new file mode 100644 index 00000000..6e7b489e --- /dev/null +++ b/src/jrpc/v2/providerUtils.ts @@ -0,0 +1,85 @@ +import { getUniqueId } from "../../utils"; +import { serializeJrpcError } from "../errors"; +import { JRPCParams, JRPCRequest, JRPCResponse, Json, RequestArguments } from "../interfaces"; +import { ProviderEvents, SafeEventEmitterProvider } from "../jrpcEngine"; +import { SafeEventEmitter } from "../safeEventEmitter"; +import { deepClone, propagateToRequest } from "./compatibility-utils"; +import { JRPCEngineV2 } from "./jrpcEngineV2"; +import type { JRPCMiddlewareV2 } from "./v2interfaces"; + +/** + * Create a {@link SafeEventEmitterProvider} from a {@link JRPCEngineV2}. + * + * Unlike the V1 counterpart, the V2 engine throws errors directly rather than + * wrapping them in response objects, so `sendAsync` simply propagates thrown errors. + * Notification forwarding is not supported since {@link JRPCEngineV2} is not an event emitter. + * + * @param engine - The V2 JSON-RPC engine. + * @returns A provider backed by the engine. + */ +export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvider { + const provider: SafeEventEmitterProvider = new SafeEventEmitter() as SafeEventEmitterProvider; + + provider.sendAsync = async (req: JRPCRequest) => { + const result = await engine.handle(req as JRPCRequest); + return result as U; + }; + + async function handleWithCallback(req: JRPCRequest, callback: (error: unknown, providerRes: JRPCResponse) => void) { + try { + const result = await engine.handle(req as JRPCRequest); + callback(null, { id: req.id, jsonrpc: "2.0", result: result as U }); + } catch (error) { + const serializedError = serializeJrpcError(error, { + shouldIncludeStack: false, + shouldPreserveMessage: true, + }); + callback(serializedError, { id: req.id, jsonrpc: "2.0", error: serializedError }); + } + } + + provider.send = (req: JRPCRequest, callback: (error: unknown, providerRes: JRPCResponse) => void) => { + if (typeof callback !== "function") { + throw new Error('Must provide callback to "send" method.'); + } + handleWithCallback(req, callback); + }; + + provider.request = async (args: RequestArguments) => { + const req: JRPCRequest = { + ...args, + id: getUniqueId(), + jsonrpc: "2.0", + }; + const res = await provider.sendAsync(req); + return res as U; + }; + + return provider; +} + +/** + * Create a {@link SafeEventEmitterProvider} from one or more V2 middleware. + * + * @param middleware - The V2 middleware to back the provider. + * @returns A provider backed by an engine composed of the given middleware. + */ +export function providerFromMiddleware(middleware: JRPCMiddlewareV2): SafeEventEmitterProvider { + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + return providerFromEngine(engine as JRPCEngineV2); +} + +/** + * Convert a {@link SafeEventEmitterProvider} into a V2 middleware. + * The middleware delegates all requests to the provider's `sendAsync` method. + * + * @param provider - The provider to wrap as middleware. + * @returns A V2 middleware that forwards requests to the provider. + */ +export function providerAsMiddleware(provider: SafeEventEmitterProvider): JRPCMiddlewareV2 { + return async ({ request, context }) => { + const providerRequest = deepClone(request); + propagateToRequest(providerRequest, context); + return (await provider.sendAsync(providerRequest)) as Json; + }; +} diff --git a/test/v2/compatibility-utils.test.ts b/test/v2/compatibility-utils.test.ts index 48f1483d..f9931e7c 100644 --- a/test/v2/compatibility-utils.test.ts +++ b/test/v2/compatibility-utils.test.ts @@ -1,7 +1,6 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { Json, JRPCParams, JRPCRequest } from "../../src"; -import { stringify } from "../../src/utils/jrpc"; +import { JRPCParams, JRPCRequest, Json } from "../../src"; import { JsonRpcError } from "../../src/jrpc/errors"; import { deepClone, @@ -9,9 +8,11 @@ import { fromLegacyRequest, makeContext, propagateToContext, + propagateToMutableRequest, propagateToRequest, } from "../../src/jrpc/v2/compatibility-utils"; import { MiddlewareContext } from "../../src/jrpc/v2/MiddlewareContext"; +import { stringify } from "../../src/utils/jrpc"; const jsonrpc = "2.0" as const; @@ -340,6 +341,85 @@ describe("compatibility-utils", () => { }); }); + describe("propagateToMutableRequest", () => { + it("returns a mutable object from a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1, 2, 3]), + id: 1, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + + const result = propagateToMutableRequest(request, context); + + expect(Object.isFrozen(result)).toBe(false); + result.newKey = "can assign"; + expect(result.newKey).toBe("can assign"); + }); + + it("does not mutate the original frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1, 2, 3]), + id: 1, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + + propagateToMutableRequest(request, context); + + expect(request).toStrictEqual({ + jsonrpc, + method: "test_method", + params: [1, 2, 3], + id: 1, + }); + expect("extraProp" in request).toBe(false); + }); + + it("propagates context properties onto the mutable clone of a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1]), + id: 42, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + context.set("anotherProp", { nested: true }); + + const result = propagateToMutableRequest(request, context); + + expect(result).toStrictEqual({ + jsonrpc, + method: "test_method", + params: [1], + id: 42, + extraProp: "value", + anotherProp: { nested: true }, + }); + }); + + it("produces deeply mutable params from a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([Object.freeze({ a: 1 }), 2]), + id: 1, + }); + const context = new MiddlewareContext(); + + const result = propagateToMutableRequest(request, context); + + expect(Object.isFrozen(result.params)).toBe(false); + (result.params as unknown[])[0] = "replaced"; + expect((result.params as unknown[])[0]).toBe("replaced"); + }); + }); + describe("deserializeError", () => { // Requires some special handling due to the possible existence or // non-existence of Error.isError diff --git a/test/v2/messageStream.test.ts b/test/v2/messageStream.test.ts new file mode 100644 index 00000000..ddbef89e --- /dev/null +++ b/test/v2/messageStream.test.ts @@ -0,0 +1,313 @@ +import { Duplex } from "readable-stream"; +import { beforeEach, describe, expect, it, Mock, vi } from "vitest"; + +import type { SafeEventEmitter } from "../../src/jrpc/safeEventEmitter"; +import type { JRPCEngineV2 } from "../../src/jrpc/v2/jrpcEngineV2"; +import { createEngineStreamV2 } from "../../src/jrpc/v2/messageStream"; + +vi.mock("readable-stream", () => { + function MockDuplex(this: Record, opts: Record) { + Object.assign(this, (MockDuplex as unknown as { _instance: Record })._instance); + (MockDuplex as unknown as { _capturedOpts: Record })._capturedOpts = opts; + } + return { Duplex: MockDuplex }; +}); +vi.mock("loglevel", () => ({ default: { error: vi.fn() } })); + +function flushPromises(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +function makeRequest(overrides: Record = {}) { + return { jsonrpc: "2.0", id: "1", method: "test", params: [] as unknown[], ...overrides }; +} + +function makeNotification(overrides: Record = {}) { + return { jsonrpc: "2.0", method: "test", params: [] as unknown[], ...overrides }; +} + +describe("createEngineStreamV2", () => { + let mockStream: Record; + let capturedWrite: (req: Record, encoding: string, cb: (error?: Error | null) => void) => void; + + function createMockEngine(handleImpl: Mock = vi.fn().mockResolvedValue(null)) { + return { handle: handleImpl } as unknown as JRPCEngineV2; + } + + function createMockEmitter() { + return { + on: vi.fn(), + removeListener: vi.fn(), + } as unknown as SafeEventEmitter; + } + + function getCapturedOpts(): Record { + return (Duplex as unknown as { _capturedOpts: Record })._capturedOpts; + } + + function setup(engine: JRPCEngineV2, notificationEmitter?: SafeEventEmitter) { + const stream = createEngineStreamV2({ engine, notificationEmitter }); + capturedWrite = getCapturedOpts().write as typeof capturedWrite; + return stream; + } + + beforeEach(() => { + mockStream = { + push: vi.fn(), + once: vi.fn(), + }; + + (Duplex as unknown as { _instance: Record })._instance = mockStream; + }); + + it("creates a Duplex in objectMode with read and write", () => { + setup(createMockEngine()); + + const opts = getCapturedOpts(); + expect(opts.objectMode).toBe(true); + expect(typeof opts.read).toBe("function"); + expect(typeof opts.write).toBe("function"); + }); + + it("returns the created stream", () => { + const result = setup(createMockEngine()); + expect(result).toHaveProperty("push", mockStream.push); + expect(result).toHaveProperty("once", mockStream.once); + }); + + it("invokes the write callback to signal backpressure release", async () => { + setup(createMockEngine()); + + const cb = vi.fn(); + capturedWrite(makeRequest(), "utf-8", cb); + await flushPromises(); + + expect(cb).toHaveBeenCalledOnce(); + }); + + describe("request handling", () => { + it("pushes a success response for a request", async () => { + const engine = createMockEngine(vi.fn().mockResolvedValue("hello")); + setup(engine); + + const cb = vi.fn(); + capturedWrite(makeRequest({ id: "42" }), "utf-8", cb); + await flushPromises(); + + expect(engine.handle).toHaveBeenCalledOnce(); + expect(cb).toHaveBeenCalledOnce(); + expect(mockStream.push).toHaveBeenCalledWith({ + id: "42", + jsonrpc: "2.0", + result: "hello", + }); + }); + + it("pushes a null result when engine returns null", async () => { + setup(createMockEngine()); + + capturedWrite(makeRequest({ id: "1" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "1", + jsonrpc: "2.0", + result: null, + }); + }); + + it("pushes complex result objects", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue({ data: [1, 2, 3] }))); + + capturedWrite(makeRequest({ id: "99" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "99", + jsonrpc: "2.0", + result: { data: [1, 2, 3] }, + }); + }); + + it("preserves the request id in the response", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue("ok"))); + + capturedWrite(makeRequest({ id: "abc-123" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "abc-123" })); + }); + + it("handles multiple sequential requests", async () => { + let counter = 0; + setup(createMockEngine(vi.fn().mockImplementation(() => Promise.resolve(++counter)))); + + capturedWrite(makeRequest({ id: "a" }), "utf-8", vi.fn()); + capturedWrite(makeRequest({ id: "b" }), "utf-8", vi.fn()); + capturedWrite(makeRequest({ id: "c" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledTimes(3); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "a" })); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "b" })); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "c" })); + }); + }); + + describe("error handling", () => { + it("pushes an error response when the engine rejects", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure")))); + + capturedWrite(makeRequest({ id: "err-1" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledExactlyOnceWith( + expect.objectContaining({ + id: "err-1", + jsonrpc: "2.0", + error: expect.objectContaining({ message: expect.stringContaining("engine failure") }), + }) + ); + }); + + it("includes the error message in the error response", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("specific failure")))); + + capturedWrite(makeRequest({ id: "err-2" }), "utf-8", vi.fn()); + await flushPromises(); + + const pushed = mockStream.push.mock.calls[0][0] as Record; + expect((pushed.error as Record).message).toContain("specific failure"); + }); + + it("calls cb() without an error to keep the stream alive", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure")))); + + const cb = vi.fn(); + capturedWrite(makeRequest({ id: "err-alive" }), "utf-8", cb); + await flushPromises(); + + expect(cb).toHaveBeenCalledOnce(); + }); + + it("uses a fallback message for non-Error throws", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue("string error"))); + + capturedWrite(makeRequest({ id: "err-3" }), "utf-8", vi.fn()); + await flushPromises(); + + const pushed = mockStream.push.mock.calls[0][0] as Record; + expect((pushed.error as Record).message).toContain("Internal JSON-RPC error"); + }); + }); + + describe("notification handling", () => { + it("does not push a response for notifications (no id)", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue(undefined))); + + capturedWrite(makeNotification(), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).not.toHaveBeenCalled(); + }); + + it("does not push an error response for notification failures", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("notification error")))); + + capturedWrite(makeNotification(), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).not.toHaveBeenCalled(); + }); + }); + + describe("notificationEmitter", () => { + it("registers a notification listener on the emitter", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + expect(emitter.on).toHaveBeenCalledWith("notification", expect.any(Function)); + }); + + it("pushes emitted notifications onto the stream", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + const notification = { jsonrpc: "2.0", method: "eth_subscription", params: { result: "0x1" } }; + handler(notification); + + expect(mockStream.push).toHaveBeenCalledWith(notification); + }); + + it("pushes multiple notifications", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + handler({ type: "a" }); + handler({ type: "b" }); + handler({ type: "c" }); + + expect(mockStream.push).toHaveBeenCalledTimes(3); + }); + + it("registers a close listener to clean up", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + expect(mockStream.once).toHaveBeenCalledWith("close", expect.any(Function)); + }); + + it("removes the notification listener when the stream closes", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const notificationHandler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1]; + const closeHandler = mockStream.once.mock.calls.find(([event]) => event === "close")![1] as () => void; + closeHandler(); + + expect(emitter.removeListener).toHaveBeenCalledWith("notification", notificationHandler); + }); + + it("does not interfere with request/response flow", async () => { + const emitter = createMockEmitter(); + const engine = createMockEngine(vi.fn().mockResolvedValue("result")); + setup(engine, emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + handler({ method: "notif" }); + + capturedWrite(makeRequest({ id: "x" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledTimes(2); + expect(mockStream.push).toHaveBeenCalledWith({ method: "notif" }); + expect(mockStream.push).toHaveBeenCalledWith({ + id: "x", + jsonrpc: "2.0", + result: "result", + }); + }); + }); + + describe("without notificationEmitter", () => { + it("does not register a close listener on the stream", () => { + setup(createMockEngine()); + expect(mockStream.once).not.toHaveBeenCalled(); + }); + + it("handles requests normally", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue("no-emitter"))); + + capturedWrite(makeRequest({ id: "1" }), "utf-8", vi.fn()); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "1", + jsonrpc: "2.0", + result: "no-emitter", + }); + }); + }); +}); diff --git a/test/v2/providerUtils.test.ts b/test/v2/providerUtils.test.ts new file mode 100644 index 00000000..5e8fc3aa --- /dev/null +++ b/test/v2/providerUtils.test.ts @@ -0,0 +1,453 @@ +import { describe, expect, it, vi } from "vitest"; + +import { JRPCParams, JRPCRequest, JRPCResponse, Json } from "../../src"; +import { SafeEventEmitter } from "../../src/jrpc/safeEventEmitter"; +import { JRPCEngineV2 } from "../../src/jrpc/v2/jrpcEngineV2"; +import { providerAsMiddleware, providerFromEngine, providerFromMiddleware } from "../../src/jrpc/v2/providerUtils"; +import type { JRPCMiddlewareV2 } from "../../src/jrpc/v2/v2interfaces"; +import { makeRequest, makeRequestMiddleware } from "../utils"; + +/** + * Wraps `provider.send` in a promise for easier testing. + */ +function sendPromise(provider: ReturnType, req: JRPCRequest): Promise<{ error: unknown; response: JRPCResponse }> { + return new Promise((resolve) => { + provider.send(req, (error, response) => { + resolve({ error, response }); + }); + }); +} + +describe("providerUtils", () => { + describe("providerFromEngine", () => { + it("returns a SafeEventEmitter instance", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(provider).toBeInstanceOf(SafeEventEmitter); + }); + + it("has sendAsync, send, and request methods", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(typeof provider.sendAsync).toBe("function"); + expect(typeof provider.send).toBe("function"); + expect(typeof provider.request).toBe("function"); + }); + + describe("sendAsync", () => { + it("returns the result from the engine", async () => { + const engine = JRPCEngineV2.create({ + middleware: [makeRequestMiddleware()], + }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBeNull(); + }); + + it("returns complex results from the engine", async () => { + const middleware: JRPCMiddlewareV2 = () => ({ foo: "bar", nums: [1, 2, 3] }); + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toStrictEqual({ foo: "bar", nums: [1, 2, 3] }); + }); + + it("propagates errors thrown by the engine", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("engine error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await expect(provider.sendAsync(makeRequest())).rejects.toThrow("engine error"); + }); + + it("works with async middleware", async () => { + const middleware: JRPCMiddlewareV2 = async () => { + return "async-result"; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("async-result"); + }); + + it("works with a middleware chain", async () => { + const middleware1: JRPCMiddlewareV2 = ({ context, next }) => { + context.set("step", 1); + return next(); + }; + const middleware2: JRPCMiddlewareV2 = ({ context }) => { + return `step-${context.get("step")}`; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware1, middleware2] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("step-1"); + }); + }); + + describe("send", () => { + it("throws if callback is not a function", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(() => { + // @ts-expect-error - Testing invalid callback + provider.send(makeRequest(), "not a function"); + }).toThrow('Must provide callback to "send" method.'); + }); + + it("calls the callback with the result on success", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "42" }); + const { error, response } = await sendPromise(provider, req); + + expect(error).toBeNull(); + expect(response).toStrictEqual({ + id: "42", + jsonrpc: "2.0", + result: null, + }); + }); + + it("calls the callback with the error on failure", async () => { + const testError = new Error("send error"); + const middleware: JRPCMiddlewareV2 = () => { + throw testError; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "42" }); + const { error, response } = await sendPromise(provider, req); + + expect(error).toMatchObject({ code: -32603, message: "send error" }); + expect(error).toBe(response.error); + expect(response.id).toBe("42"); + expect(response.jsonrpc).toBe("2.0"); + }); + + it("constructs a proper JRPCResponse shape", async () => { + const middleware: JRPCMiddlewareV2 = () => "hello"; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "99" }); + const { response } = await sendPromise(provider, req); + + expect(response).toHaveProperty("id", "99"); + expect(response).toHaveProperty("jsonrpc", "2.0"); + expect(response).toHaveProperty("result", "hello"); + }); + + it("does not reject if the success callback throws", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + + provider.send(makeRequest(), () => { + throw new Error("callback boom"); + }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + process.off("unhandledRejection", unhandled); + expect(unhandled).toHaveBeenCalledTimes(1); + }); + + it("does not reject if the error callback throws", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("engine error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + + provider.send(makeRequest(), () => { + throw new Error("callback boom"); + }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + process.off("unhandledRejection", unhandled); + expect(unhandled).toHaveBeenCalledTimes(1); + }); + + it.each([ + { label: "string", thrown: "string error", expectedCause: "string error" }, + { label: "null", thrown: null, expectedCause: null }, + { label: "undefined", thrown: undefined, expectedCause: null }, + { label: "number", thrown: 42, expectedCause: 42 }, + ])("serializes non-Error throw ($label) via send callback", async ({ thrown, expectedCause }) => { + const middleware: JRPCMiddlewareV2 = () => { + throw thrown; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const { error, response } = await sendPromise(provider, makeRequest({ id: "x" })); + + expect(error).toBe(response.error); + expect(response.id).toBe("x"); + expect(response.jsonrpc).toBe("2.0"); + const rpcError = response.error as Record; + expect(rpcError).toBeDefined(); + expect(rpcError).toHaveProperty("code"); + expect((rpcError.data as Record).cause).toStrictEqual(expectedCause); + }); + }); + + describe("request", () => { + it("returns the result", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.request({ method: "test_method" }); + + expect(result).toBeNull(); + }); + + it("constructs a JRPC request with id and jsonrpc fields", async () => { + const observedRequests: JRPCRequest[] = []; + const middleware: JRPCMiddlewareV2 = ({ request }) => { + observedRequests.push(request); + return null; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await provider.request({ method: "test_method", params: [1, 2] }); + + expect(observedRequests).toHaveLength(1); + expect(observedRequests[0]).toMatchObject({ + method: "test_method", + params: [1, 2], + jsonrpc: "2.0", + }); + expect(typeof observedRequests[0].id).toBe("number"); + }); + + it("passes params through to the engine", async () => { + const middleware: JRPCMiddlewareV2, Json> = ({ request }) => { + return request.params; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.request({ method: "echo", params: ["a", "b"] }); + + expect(result).toStrictEqual(["a", "b"]); + }); + + it("propagates errors", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("request error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await expect(provider.request({ method: "fail" })).rejects.toThrow("request error"); + }); + + it("generates unique ids for each request", async () => { + const observedIds: Set = new Set(); + const middleware: JRPCMiddlewareV2 = ({ request }) => { + observedIds.add(request.id as string); + return null; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await Promise.all(Array.from({ length: 10 }, () => provider.request({ method: "test" }))); + + expect(observedIds.size).toBe(10); + }); + }); + }); + + describe("providerFromMiddleware", () => { + it("creates a provider backed by the given middleware", async () => { + const middleware: JRPCMiddlewareV2 = () => "middleware-result"; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("middleware-result"); + }); + + it("returns a SafeEventEmitter instance", () => { + const provider = providerFromMiddleware(makeRequestMiddleware() as JRPCMiddlewareV2); + + expect(provider).toBeInstanceOf(SafeEventEmitter); + }); + + it("supports request method", async () => { + const middleware: JRPCMiddlewareV2 = ({ request }) => { + return `echo:${request.method}`; + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const result = await provider.request({ method: "hello" }); + + expect(result).toBe("echo:hello"); + }); + + it("propagates middleware errors via sendAsync", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("middleware-error"); + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + await expect(provider.sendAsync(makeRequest())).rejects.toThrow("middleware-error"); + }); + + it("propagates middleware errors via send callback", async () => { + const testError = new Error("middleware-error"); + const middleware: JRPCMiddlewareV2 = () => { + throw testError; + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const { error, response } = await sendPromise(provider, makeRequest()); + + expect(error).toMatchObject({ code: -32603, message: "middleware-error" }); + expect(error).toBe(response.error); + }); + + it("works with a middleware that uses context", async () => { + const middleware1: JRPCMiddlewareV2 = ({ context, next }) => { + context.set("value", 42); + return next(); + }; + const middleware2: JRPCMiddlewareV2 = ({ context }) => { + return context.get("value") as Json; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware1, middleware2] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe(42); + }); + }); + + describe("providerAsMiddleware", () => { + it("converts a provider into a V2 middleware", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const middleware = providerAsMiddleware(provider); + + expect(typeof middleware).toBe("function"); + }); + + it("delegates requests to the provider", async () => { + const innerMiddleware: JRPCMiddlewareV2 = () => "provider-result"; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + const result = await outerEngine.handle(makeRequest()); + + expect(result).toBe("provider-result"); + }); + + it("propagates errors from the provider", async () => { + const innerMiddleware: JRPCMiddlewareV2 = () => { + throw new Error("inner error"); + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + await expect(outerEngine.handle(makeRequest())).rejects.toThrow("inner error"); + }); + + it("can be composed in a middleware chain", async () => { + const innerMiddleware: JRPCMiddlewareV2 = ({ request }) => { + return `inner:${request.method}`; + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerMiddleware = vi.fn(({ next }) => next()); + const outerEngine = JRPCEngineV2.create({ + middleware: [outerMiddleware, providerAsMiddleware(provider)], + }); + + const result = await outerEngine.handle(makeRequest({ method: "my_method" })); + + expect(result).toBe("inner:my_method"); + expect(outerMiddleware).toHaveBeenCalledTimes(1); + }); + + it("round-trips: engine -> provider -> middleware -> engine", async () => { + const originalMiddleware: JRPCMiddlewareV2 = () => "round-trip"; + const engine1 = JRPCEngineV2.create({ middleware: [originalMiddleware] }); + const provider = providerFromEngine(engine1 as JRPCEngineV2); + const wrappedMiddleware = providerAsMiddleware(provider); + + const engine2 = JRPCEngineV2.create({ middleware: [wrappedMiddleware] }); + + const result = await engine2.handle(makeRequest()); + + expect(result).toBe("round-trip"); + }); + + it("round-trips: middleware -> provider -> middleware", async () => { + const originalMiddleware: JRPCMiddlewareV2 = () => "full-circle"; + const provider = providerFromMiddleware(originalMiddleware); + const wrappedMiddleware = providerAsMiddleware(provider); + + const engine = JRPCEngineV2.create({ middleware: [wrappedMiddleware] }); + + const result = await engine.handle(makeRequest()); + + expect(result).toBe("full-circle"); + }); + + it("deep-clones the request so frozen nested params are isolated from the original", async () => { + let receivedParams: unknown; + const innerMiddleware: JRPCMiddlewareV2, Json> = ({ request }) => { + receivedParams = request.params; + return request.params; + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + const originalParams = [{ nested: "value" }]; + const req = makeRequest({ params: originalParams }); + + await outerEngine.handle(req); + + expect(receivedParams).toStrictEqual(originalParams); + expect(receivedParams).not.toBe(originalParams); + expect((receivedParams as Record[])[0]).not.toBe(originalParams[0]); + }); + }); +});