From 3d932e074321c64bc44848b881f1129a0f1160a0 Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 17:58:26 +0800 Subject: [PATCH 1/6] feat: V2 Engine/Middleware to provider utils --- package-lock.json | 21 -- src/jrpc/v2/providerUtils.ts | 72 +++++++ test/v2/providerUtils.test.ts | 372 ++++++++++++++++++++++++++++++++++ 3 files changed, 444 insertions(+), 21 deletions(-) create mode 100644 src/jrpc/v2/providerUtils.ts create mode 100644 test/v2/providerUtils.test.ts diff --git a/package-lock.json b/package-lock.json index 3b1891fb..13ec7646 100644 --- a/package-lock.json +++ b/package-lock.json @@ -109,7 +109,6 @@ "integrity": "sha512-CGOfOJqWjg2qW/Mb6zNsDm+u5vFQ8DxXfbM09z69p5Z6+mE1ikP2jUXw+j42Pf1XTYED2Rni5f95npYeuwMDQA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.29.0", "@babel/generator": "^7.29.0", @@ -1690,7 +1689,6 @@ "resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.28.6.tgz", "integrity": "sha512-05WQkdpL9COIMz4LjTxGpPNCdlpyimKppYNoJ5Di5EUObifl8t4tuLuUBBZEpoLYOmfvIWrsp9fCl0HoPRVTdA==", "license": "MIT", - "peer": true, "engines": { "node": ">=6.9.0" } @@ -3267,7 +3265,6 @@ "integrity": "sha512-DhGl4xMVFGVIyMwswXeyzdL4uXD5OGILGX5N8Y+f6W7LhC1Ze2poSNrkF/fedpVDHEEZ+PHFW0vL14I+mm8K3Q==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@octokit/auth-token": "^6.0.0", "@octokit/graphql": "^9.0.3", @@ -4603,7 +4600,6 @@ "integrity": "sha512-FXx2pKgId/WyYo2jXw63kk7/+TY7u7AziEJxJAnSFzHlqTAS3Ync6SvgYAN/k4/PQpnnVuzoMuVnByKK2qp0ag==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "*", "@types/json-schema": "*" @@ -4792,7 +4788,6 @@ "integrity": "sha512-4z2nCSBfVIMnbuu8uinj+f0o4qOeggYJLbjpPHka3KH1om7e+H9yLKTYgksTaHcGco+NClhhY2vyO3HsMH1RGw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.55.0", "@typescript-eslint/types": "8.55.0", @@ -5699,7 +5694,6 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -5759,7 +5753,6 @@ "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.1", "fast-json-stable-stringify": "^2.0.0", @@ -6544,7 +6537,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -7993,7 +7985,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -8054,7 +8045,6 @@ "integrity": "sha512-82GZUjRS0p/jganf6q1rEO25VSoHH0hKPCTrgillPjdI/3bgBhAE1QzHrHTizjpRvy6pGAvKjDJtk2pF9NDq8w==", "dev": true, "license": "MIT", - "peer": true, "bin": { "eslint-config-prettier": "bin/cli.js" }, @@ -8181,7 +8171,6 @@ "integrity": "sha512-whOE1HFo/qJDyX4SnXzP4N6zOWn79WhnCUY/iDR0mPfQZO8wcYE4JClzI2oZrhBnnMUCBCHZhO6VQyoBU95mZA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@rtsao/scc": "^1.1.0", "array-includes": "^3.1.9", @@ -11301,7 +11290,6 @@ "integrity": "sha512-E3ZJh4J3S9KfwdjZhe2afj6R9lGIN5Pher1pF39UGrXRqq/VDaGVIGN13BjHd2u8B61hArAGOnso7nBOouW3TQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/parser": "^7.29.0", "@babel/types": "^7.29.0", @@ -12513,7 +12501,6 @@ "integrity": "sha512-UOnG6LftzbdaHZcKoPFtOcCKztrQ57WkHDeRD9t/PTQtmT0NHSeWWepj6pS0z/N7+08BHFDQVUrfmfMRcZwbMg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -13205,7 +13192,6 @@ "integrity": "sha512-oQL6lgK3e2QZeQ7gcgIkS2YZPg5slw37hYufJ3edKlfQSGGm8ICoxswK15ntSzF/a8+h7ekRy7k7oWc3BQ7y8A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/estree": "1.0.8" }, @@ -13385,7 +13371,6 @@ "integrity": "sha512-B/gBuNg5SiMTrPkC+A2+cW0RszwxYmn6VYxB/inlBStS5nx6xHIt/ehKRhIMhqusl7a8LjQoZnjCs5vhwxOQ1g==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "fast-deep-equal": "^3.1.3", "fast-uri": "^3.0.1", @@ -14497,7 +14482,6 @@ "integrity": "sha512-5C1sg4USs1lfG0GFb2RLXsdpXqBSEhAaA/0kPL01wxzpMqLILNxIxIOKiILz+cdg/pLnOUxFYOR5yhHU666wbw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "~0.27.0", "get-tsconfig": "^4.7.5" @@ -14640,7 +14624,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -14786,7 +14769,6 @@ "dev": true, "hasInstallScript": true, "license": "MIT", - "peer": true, "dependencies": { "napi-postinstall": "^0.3.0" }, @@ -14951,7 +14933,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -15027,7 +15008,6 @@ "integrity": "sha512-hOQuK7h0FGKgBAas7v0mSAsnvrIgAvWmRFjmzpJ7SwFHH3g1k2u37JtYwOwmEKhK6ZO3v9ggDBBm0La1LCK4uQ==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@vitest/expect": "4.0.18", "@vitest/mocker": "4.0.18", @@ -15120,7 +15100,6 @@ "integrity": "sha512-Gdj3X74CLJJ8zy4URmK42W7wTZUJrqL+z8nyGEr4dTN0kb3nVs+ZvjbTOqRYPD7qX4tUmwyHL9Q9K6T1seW6Yw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/eslint-scope": "^3.7.7", "@types/estree": "^1.0.8", diff --git a/src/jrpc/v2/providerUtils.ts b/src/jrpc/v2/providerUtils.ts new file mode 100644 index 00000000..9d04a369 --- /dev/null +++ b/src/jrpc/v2/providerUtils.ts @@ -0,0 +1,72 @@ +import { JRPCParams, JRPCRequest, JRPCResponse, Json, RequestArguments } from "../interfaces"; +import { ProviderEvents, SafeEventEmitterProvider } from "../jrpcEngine"; +import { SafeEventEmitter } from "../safeEventEmitter"; +import { JRPCEngineV2 } from "./jrpcEngineV2"; +import type { JRPCMiddlewareV2 } from "./v2interfaces"; + +/** + * Create a {@link SafeEventEmitterProvider} from a {@link JRPCEngineV2}. + * + * Unlike the V1 counterpart, the V2 engine throws errors directly rather than + * wrapping them in response objects, so `sendAsync` simply propagates thrown errors. + * Notification forwarding is not supported since {@link JRPCEngineV2} is not an event emitter. + * + * @param engine - The V2 JSON-RPC engine. + * @returns A provider backed by the engine. + */ +export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvider { + const provider: SafeEventEmitterProvider = new SafeEventEmitter() as SafeEventEmitterProvider; + + provider.sendAsync = async (req: JRPCRequest) => { + const result = await engine.handle(req as JRPCRequest); + return result as U; + }; + + provider.send = (req: JRPCRequest, callback: (error: unknown, providerRes: JRPCResponse) => void) => { + if (typeof callback !== "function") { + throw new Error('Must provide callback to "send" method.'); + } + engine + .handle(req as JRPCRequest) + // The callback is called in a setTimeout to ensure that it's being called only once. + // Ref: https://github.com/eslint-community/eslint-plugin-promise/blob/main/docs/rules/no-callback-in-promise.md + .then((result) => setTimeout(() => callback(null, { id: req.id, jsonrpc: "2.0", result: result as U }), 0)) + .catch((error) => setTimeout(() => callback(error, { id: req.id, jsonrpc: "2.0", error }), 0)); + }; + + provider.request = async (args: RequestArguments) => { + const req: JRPCRequest = { + ...args, + id: Math.random().toString(36).slice(2), + jsonrpc: "2.0", + }; + const res = await provider.sendAsync(req); + return res as U; + }; + + return provider; +} + +/** + * Create a {@link SafeEventEmitterProvider} from one or more V2 middleware. + * + * @param middleware - The V2 middleware to back the provider. + * @returns A provider backed by an engine composed of the given middleware. + */ +export function providerFromMiddleware(middleware: JRPCMiddlewareV2): SafeEventEmitterProvider { + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + return providerFromEngine(engine as JRPCEngineV2); +} + +/** + * Convert a {@link SafeEventEmitterProvider} into a V2 middleware. + * The middleware delegates all requests to the provider's `sendAsync` method. + * + * @param provider - The provider to wrap as middleware. + * @returns A V2 middleware that forwards requests to the provider. + */ +export function providerAsMiddleware(provider: SafeEventEmitterProvider): JRPCMiddlewareV2 { + return async ({ request }) => { + return (await provider.sendAsync(request)) as Json; + }; +} diff --git a/test/v2/providerUtils.test.ts b/test/v2/providerUtils.test.ts new file mode 100644 index 00000000..1a8ee563 --- /dev/null +++ b/test/v2/providerUtils.test.ts @@ -0,0 +1,372 @@ +import { describe, expect, it, vi } from "vitest"; + +import { JRPCParams, JRPCRequest, JRPCResponse, Json } from "../../src"; +import { SafeEventEmitter } from "../../src/jrpc/safeEventEmitter"; +import { JRPCEngineV2 } from "../../src/jrpc/v2/jrpcEngineV2"; +import { providerAsMiddleware, providerFromEngine, providerFromMiddleware } from "../../src/jrpc/v2/providerUtils"; +import type { JRPCMiddlewareV2 } from "../../src/jrpc/v2/v2interfaces"; +import { makeRequest, makeRequestMiddleware } from "../utils"; + +/** + * Wraps `provider.send` in a promise for easier testing. + */ +function sendPromise(provider: ReturnType, req: JRPCRequest): Promise<{ error: unknown; response: JRPCResponse }> { + return new Promise((resolve) => { + provider.send(req, (error, response) => { + resolve({ error, response }); + }); + }); +} + +describe("providerUtils", () => { + describe("providerFromEngine", () => { + it("returns a SafeEventEmitter instance", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(provider).toBeInstanceOf(SafeEventEmitter); + }); + + it("has sendAsync, send, and request methods", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(typeof provider.sendAsync).toBe("function"); + expect(typeof provider.send).toBe("function"); + expect(typeof provider.request).toBe("function"); + }); + + describe("sendAsync", () => { + it("returns the result from the engine", async () => { + const engine = JRPCEngineV2.create({ + middleware: [makeRequestMiddleware()], + }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBeNull(); + }); + + it("returns complex results from the engine", async () => { + const middleware: JRPCMiddlewareV2 = () => ({ foo: "bar", nums: [1, 2, 3] }); + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toStrictEqual({ foo: "bar", nums: [1, 2, 3] }); + }); + + it("propagates errors thrown by the engine", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("engine error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await expect(provider.sendAsync(makeRequest())).rejects.toThrow("engine error"); + }); + + it("works with async middleware", async () => { + const middleware: JRPCMiddlewareV2 = async () => { + return "async-result"; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("async-result"); + }); + + it("works with a middleware chain", async () => { + const middleware1: JRPCMiddlewareV2 = ({ context, next }) => { + context.set("step", 1); + return next(); + }; + const middleware2: JRPCMiddlewareV2 = ({ context }) => { + return `step-${context.get("step")}`; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware1, middleware2] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("step-1"); + }); + }); + + describe("send", () => { + it("throws if callback is not a function", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + expect(() => { + // @ts-expect-error - Testing invalid callback + provider.send(makeRequest(), "not a function"); + }).toThrow('Must provide callback to "send" method.'); + }); + + it("calls the callback with the result on success", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "42" }); + const { error, response } = await sendPromise(provider, req); + + expect(error).toBeNull(); + expect(response).toStrictEqual({ + id: "42", + jsonrpc: "2.0", + result: null, + }); + }); + + it("calls the callback with the error on failure", async () => { + const testError = new Error("send error"); + const middleware: JRPCMiddlewareV2 = () => { + throw testError; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "42" }); + const { error, response } = await sendPromise(provider, req); + + expect(error).toBe(testError); + expect(response.id).toBe("42"); + expect(response.jsonrpc).toBe("2.0"); + expect(response.error).toBe(testError); + }); + + it("constructs a proper JRPCResponse shape", async () => { + const middleware: JRPCMiddlewareV2 = () => "hello"; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const req = makeRequest({ id: "99" }); + const { response } = await sendPromise(provider, req); + + expect(response).toHaveProperty("id", "99"); + expect(response).toHaveProperty("jsonrpc", "2.0"); + expect(response).toHaveProperty("result", "hello"); + }); + }); + + describe("request", () => { + it("returns the result", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.request({ method: "test_method" }); + + expect(result).toBeNull(); + }); + + it("constructs a JRPC request with id and jsonrpc fields", async () => { + const observedRequests: JRPCRequest[] = []; + const middleware: JRPCMiddlewareV2 = ({ request }) => { + observedRequests.push(request); + return null; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await provider.request({ method: "test_method", params: [1, 2] }); + + expect(observedRequests).toHaveLength(1); + expect(observedRequests[0]).toMatchObject({ + method: "test_method", + params: [1, 2], + jsonrpc: "2.0", + }); + expect(typeof observedRequests[0].id).toBe("string"); + expect((observedRequests[0].id as string).length).toBeGreaterThan(0); + }); + + it("passes params through to the engine", async () => { + const middleware: JRPCMiddlewareV2, Json> = ({ request }) => { + return request.params; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.request({ method: "echo", params: ["a", "b"] }); + + expect(result).toStrictEqual(["a", "b"]); + }); + + it("propagates errors", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("request error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await expect(provider.request({ method: "fail" })).rejects.toThrow("request error"); + }); + + it("generates unique ids for each request", async () => { + const observedIds: Set = new Set(); + const middleware: JRPCMiddlewareV2 = ({ request }) => { + observedIds.add(request.id as string); + return null; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + await Promise.all(Array.from({ length: 10 }, () => provider.request({ method: "test" }))); + + expect(observedIds.size).toBe(10); + }); + }); + }); + + describe("providerFromMiddleware", () => { + it("creates a provider backed by the given middleware", async () => { + const middleware: JRPCMiddlewareV2 = () => "middleware-result"; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe("middleware-result"); + }); + + it("returns a SafeEventEmitter instance", () => { + const provider = providerFromMiddleware(makeRequestMiddleware() as JRPCMiddlewareV2); + + expect(provider).toBeInstanceOf(SafeEventEmitter); + }); + + it("supports request method", async () => { + const middleware: JRPCMiddlewareV2 = ({ request }) => { + return `echo:${request.method}`; + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const result = await provider.request({ method: "hello" }); + + expect(result).toBe("echo:hello"); + }); + + it("propagates middleware errors via sendAsync", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("middleware-error"); + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + await expect(provider.sendAsync(makeRequest())).rejects.toThrow("middleware-error"); + }); + + it("propagates middleware errors via send callback", async () => { + const testError = new Error("middleware-error"); + const middleware: JRPCMiddlewareV2 = () => { + throw testError; + }; + const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); + + const { error } = await sendPromise(provider, makeRequest()); + + expect(error).toBe(testError); + }); + + it("works with a middleware that uses context", async () => { + const middleware1: JRPCMiddlewareV2 = ({ context, next }) => { + context.set("value", 42); + return next(); + }; + const middleware2: JRPCMiddlewareV2 = ({ context }) => { + return context.get("value") as Json; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware1, middleware2] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const result = await provider.sendAsync(makeRequest()); + + expect(result).toBe(42); + }); + }); + + describe("providerAsMiddleware", () => { + it("converts a provider into a V2 middleware", () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const middleware = providerAsMiddleware(provider); + + expect(typeof middleware).toBe("function"); + }); + + it("delegates requests to the provider", async () => { + const innerMiddleware: JRPCMiddlewareV2 = () => "provider-result"; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + const result = await outerEngine.handle(makeRequest()); + + expect(result).toBe("provider-result"); + }); + + it("propagates errors from the provider", async () => { + const innerMiddleware: JRPCMiddlewareV2 = () => { + throw new Error("inner error"); + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + await expect(outerEngine.handle(makeRequest())).rejects.toThrow("inner error"); + }); + + it("can be composed in a middleware chain", async () => { + const innerMiddleware: JRPCMiddlewareV2 = ({ request }) => { + return `inner:${request.method}`; + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerMiddleware = vi.fn(({ next }) => next()); + const outerEngine = JRPCEngineV2.create({ + middleware: [outerMiddleware, providerAsMiddleware(provider)], + }); + + const result = await outerEngine.handle(makeRequest({ method: "my_method" })); + + expect(result).toBe("inner:my_method"); + expect(outerMiddleware).toHaveBeenCalledTimes(1); + }); + + it("round-trips: engine -> provider -> middleware -> engine", async () => { + const originalMiddleware: JRPCMiddlewareV2 = () => "round-trip"; + const engine1 = JRPCEngineV2.create({ middleware: [originalMiddleware] }); + const provider = providerFromEngine(engine1 as JRPCEngineV2); + const wrappedMiddleware = providerAsMiddleware(provider); + + const engine2 = JRPCEngineV2.create({ middleware: [wrappedMiddleware] }); + + const result = await engine2.handle(makeRequest()); + + expect(result).toBe("round-trip"); + }); + + it("round-trips: middleware -> provider -> middleware", async () => { + const originalMiddleware: JRPCMiddlewareV2 = () => "full-circle"; + const provider = providerFromMiddleware(originalMiddleware); + const wrappedMiddleware = providerAsMiddleware(provider); + + const engine = JRPCEngineV2.create({ middleware: [wrappedMiddleware] }); + + const result = await engine.handle(makeRequest()); + + expect(result).toBe("full-circle"); + }); + }); +}); From b34ab4d1b30e21926aed3a742fbd7da1012cc144 Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 17:58:57 +0800 Subject: [PATCH 2/6] feat: duplex message stream with JRPCV2 engine --- src/jrpc/v2/index.ts | 7 ++++ src/jrpc/v2/messageStream.ts | 64 ++++++++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 src/jrpc/v2/messageStream.ts diff --git a/src/jrpc/v2/index.ts b/src/jrpc/v2/index.ts index 242c9d0c..b55115b8 100644 --- a/src/jrpc/v2/index.ts +++ b/src/jrpc/v2/index.ts @@ -1,9 +1,16 @@ export { getUniqueId, isNotification, isRequest } from "../../utils/jrpc"; export { asLegacyMiddleware } from "./asLegacyMiddleware"; +export { deepClone, fromLegacyRequest, makeContext, propagateToContext, propagateToRequest } from "./compatibility-utils"; export { createScaffoldMiddleware as createScaffoldMiddlewareV2 } from "./createScaffoldMiddleware"; export { JRPCEngineV2 } from "./jrpcEngineV2"; export { JRPCServer } from "./jrpcServer"; +export { createEngineStreamV2 } from "./messageStream"; export { MiddlewareContext } from "./MiddlewareContext"; +export { + providerAsMiddleware as providerAsMiddlewareV2, + providerFromEngine as providerFromEngineV2, + providerFromMiddleware as providerFromMiddlewareV2, +} from "./providerUtils"; export type { ContextConstraint, EmptyContext, diff --git a/src/jrpc/v2/messageStream.ts b/src/jrpc/v2/messageStream.ts new file mode 100644 index 00000000..369730f8 --- /dev/null +++ b/src/jrpc/v2/messageStream.ts @@ -0,0 +1,64 @@ +import { Duplex } from "readable-stream"; + +import { isRequest } from "../../utils/jrpc"; +import { rpcErrors } from "../errors"; +import { JRPCRequest, Json } from "../interfaces"; +import { SafeEventEmitter } from "../safeEventEmitter"; +import { JRPCEngineV2 } from "./jrpcEngineV2"; + +/** + * Creates a Duplex object stream for an engine (JRPCEngineV2) + a separate notification emitter. + * + * Replaces V1's createEngineStream by decoupling notification forwarding from + * the engine itself. Notifications are routed through a SafeEventEmitter that + * pushes onto the same stream, so the engine no longer needs to be an EventEmitter. + */ +export function createEngineStreamV2({ engine, notificationEmitter }: { engine: JRPCEngineV2; notificationEmitter?: SafeEventEmitter }): Duplex { + let stream: Duplex | undefined = undefined; + + function noop() { + // noop + } + + function write(req: JRPCRequest, _encoding: BufferEncoding, cb: () => void) { + engine + .handle(req) + .then((res: Json | void): void => { + if (res !== undefined && isRequest(req)) { + stream?.push({ + id: req.id, + jsonrpc: "2.0", + result: res, + }); + } + return undefined; + }) + .catch((err: unknown) => { + if (isRequest(req)) { + const message = err instanceof Error ? err.message : "Internal JSON-RPC error"; + stream?.push({ + id: req.id, + jsonrpc: "2.0", + error: rpcErrors.internal({ message }), + }); + } + }); + + cb(); + } + + stream = new Duplex({ objectMode: true, read: noop, write }); + + if (notificationEmitter) { + const onNotification = (message: unknown) => { + stream?.push(message); + }; + + notificationEmitter.on("notification", onNotification); + stream?.once("close", () => { + notificationEmitter.removeListener("notification", onNotification); + }); + } + + return stream; +} From 12117b9c686ca5f734db4be319c17b44df9fc939 Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 18:13:43 +0800 Subject: [PATCH 3/6] fix: updated tests --- src/jrpc/v2/messageStream.ts | 6 +- src/jrpc/v2/providerUtils.ts | 23 ++- test/v2/messageStream.test.ts | 291 ++++++++++++++++++++++++++++++++++ test/v2/providerUtils.test.ts | 86 +++++++++- 4 files changed, 393 insertions(+), 13 deletions(-) create mode 100644 test/v2/messageStream.test.ts diff --git a/src/jrpc/v2/messageStream.ts b/src/jrpc/v2/messageStream.ts index 369730f8..545f66a7 100644 --- a/src/jrpc/v2/messageStream.ts +++ b/src/jrpc/v2/messageStream.ts @@ -1,3 +1,4 @@ +import log from "loglevel"; import { Duplex } from "readable-stream"; import { isRequest } from "../../utils/jrpc"; @@ -20,7 +21,7 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: // noop } - function write(req: JRPCRequest, _encoding: BufferEncoding, cb: () => void) { + function write(req: JRPCRequest, _encoding: BufferEncoding) { engine .handle(req) .then((res: Json | void): void => { @@ -42,9 +43,8 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: error: rpcErrors.internal({ message }), }); } + log.error(err); }); - - cb(); } stream = new Duplex({ objectMode: true, read: noop, write }); diff --git a/src/jrpc/v2/providerUtils.ts b/src/jrpc/v2/providerUtils.ts index 9d04a369..85ed4036 100644 --- a/src/jrpc/v2/providerUtils.ts +++ b/src/jrpc/v2/providerUtils.ts @@ -1,6 +1,9 @@ +import { getUniqueId } from "../../utils"; +import { serializeJrpcError } from "../errors"; import { JRPCParams, JRPCRequest, JRPCResponse, Json, RequestArguments } from "../interfaces"; import { ProviderEvents, SafeEventEmitterProvider } from "../jrpcEngine"; import { SafeEventEmitter } from "../safeEventEmitter"; +import { deepClone, propagateToRequest } from "./compatibility-utils"; import { JRPCEngineV2 } from "./jrpcEngineV2"; import type { JRPCMiddlewareV2 } from "./v2interfaces"; @@ -28,16 +31,20 @@ export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvid } engine .handle(req as JRPCRequest) - // The callback is called in a setTimeout to ensure that it's being called only once. - // Ref: https://github.com/eslint-community/eslint-plugin-promise/blob/main/docs/rules/no-callback-in-promise.md - .then((result) => setTimeout(() => callback(null, { id: req.id, jsonrpc: "2.0", result: result as U }), 0)) - .catch((error) => setTimeout(() => callback(error, { id: req.id, jsonrpc: "2.0", error }), 0)); + .then((result) => callback(null, { id: req.id, jsonrpc: "2.0", result: result as U })) + .catch((error) => { + const serializedError = serializeJrpcError(error, { + shouldIncludeStack: false, + shouldPreserveMessage: true, + }); + callback(error, { id: req.id, jsonrpc: "2.0", error: serializedError }); + }); }; provider.request = async (args: RequestArguments) => { const req: JRPCRequest = { ...args, - id: Math.random().toString(36).slice(2), + id: getUniqueId(), jsonrpc: "2.0", }; const res = await provider.sendAsync(req); @@ -66,7 +73,9 @@ export function providerFromMiddleware(middleware: JRPCMiddlewareV2): SafeEventE * @returns A V2 middleware that forwards requests to the provider. */ export function providerAsMiddleware(provider: SafeEventEmitterProvider): JRPCMiddlewareV2 { - return async ({ request }) => { - return (await provider.sendAsync(request)) as Json; + return async ({ request, context }) => { + const providerRequest = deepClone(request); + propagateToRequest(providerRequest, context); + return (await provider.sendAsync(providerRequest)) as Json; }; } diff --git a/test/v2/messageStream.test.ts b/test/v2/messageStream.test.ts new file mode 100644 index 00000000..6ad4f150 --- /dev/null +++ b/test/v2/messageStream.test.ts @@ -0,0 +1,291 @@ +import { Duplex } from "readable-stream"; +import { beforeEach, describe, expect, it, Mock, vi } from "vitest"; + +import type { SafeEventEmitter } from "../../src/jrpc/safeEventEmitter"; +import type { JRPCEngineV2 } from "../../src/jrpc/v2/jrpcEngineV2"; +import { createEngineStreamV2 } from "../../src/jrpc/v2/messageStream"; + +vi.mock("readable-stream", () => { + function MockDuplex(this: Record, opts: Record) { + Object.assign(this, (MockDuplex as unknown as { _instance: Record })._instance); + (MockDuplex as unknown as { _capturedOpts: Record })._capturedOpts = opts; + } + return { Duplex: MockDuplex }; +}); +vi.mock("loglevel", () => ({ default: { error: vi.fn() } })); + +function flushPromises(): Promise { + return new Promise((resolve) => setTimeout(resolve, 0)); +} + +function makeRequest(overrides: Record = {}) { + return { jsonrpc: "2.0", id: "1", method: "test", params: [] as unknown[], ...overrides }; +} + +function makeNotification(overrides: Record = {}) { + return { jsonrpc: "2.0", method: "test", params: [] as unknown[], ...overrides }; +} + +describe("createEngineStreamV2", () => { + let mockStream: Record; + let capturedWrite: (req: Record, encoding: string) => void; + + function createMockEngine(handleImpl: Mock = vi.fn().mockResolvedValue(null)) { + return { handle: handleImpl } as unknown as JRPCEngineV2; + } + + function createMockEmitter() { + return { + on: vi.fn(), + removeListener: vi.fn(), + } as unknown as SafeEventEmitter; + } + + function getCapturedOpts(): Record { + return (Duplex as unknown as { _capturedOpts: Record })._capturedOpts; + } + + function setup(engine: JRPCEngineV2, notificationEmitter?: SafeEventEmitter) { + const stream = createEngineStreamV2({ engine, notificationEmitter }); + capturedWrite = getCapturedOpts().write as typeof capturedWrite; + return stream; + } + + beforeEach(() => { + mockStream = { + push: vi.fn(), + once: vi.fn(), + }; + + (Duplex as unknown as { _instance: Record })._instance = mockStream; + }); + + it("creates a Duplex in objectMode with read and write", () => { + setup(createMockEngine()); + + const opts = getCapturedOpts(); + expect(opts.objectMode).toBe(true); + expect(typeof opts.read).toBe("function"); + expect(typeof opts.write).toBe("function"); + }); + + it("returns the created stream", () => { + const result = setup(createMockEngine()); + expect(result).toHaveProperty("push", mockStream.push); + expect(result).toHaveProperty("once", mockStream.once); + }); + + describe("request handling", () => { + it("pushes a success response for a request", async () => { + const engine = createMockEngine(vi.fn().mockResolvedValue("hello")); + setup(engine); + + capturedWrite(makeRequest({ id: "42" }), "utf-8"); + await flushPromises(); + + expect(engine.handle).toHaveBeenCalledOnce(); + expect(mockStream.push).toHaveBeenCalledWith({ + id: "42", + jsonrpc: "2.0", + result: "hello", + }); + }); + + it("pushes a null result when engine returns null", async () => { + setup(createMockEngine()); + + capturedWrite(makeRequest({ id: "1" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "1", + jsonrpc: "2.0", + result: null, + }); + }); + + it("pushes complex result objects", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue({ data: [1, 2, 3] }))); + + capturedWrite(makeRequest({ id: "99" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "99", + jsonrpc: "2.0", + result: { data: [1, 2, 3] }, + }); + }); + + it("preserves the request id in the response", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue("ok"))); + + capturedWrite(makeRequest({ id: "abc-123" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "abc-123" })); + }); + + it("handles multiple sequential requests", async () => { + let counter = 0; + setup(createMockEngine(vi.fn().mockImplementation(() => Promise.resolve(++counter)))); + + capturedWrite(makeRequest({ id: "a" }), "utf-8"); + capturedWrite(makeRequest({ id: "b" }), "utf-8"); + capturedWrite(makeRequest({ id: "c" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledTimes(3); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "a" })); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "b" })); + expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "c" })); + }); + }); + + describe("error handling", () => { + it("pushes an error response when the engine rejects", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure")))); + + capturedWrite(makeRequest({ id: "err-1" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledExactlyOnceWith( + expect.objectContaining({ + id: "err-1", + jsonrpc: "2.0", + error: expect.objectContaining({ message: expect.stringContaining("engine failure") }), + }) + ); + }); + + it("includes the error message in the error response", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("specific failure")))); + + capturedWrite(makeRequest({ id: "err-2" }), "utf-8"); + await flushPromises(); + + const pushed = mockStream.push.mock.calls[0][0] as Record; + expect((pushed.error as Record).message).toContain("specific failure"); + }); + + it("uses a fallback message for non-Error throws", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue("string error"))); + + capturedWrite(makeRequest({ id: "err-3" }), "utf-8"); + await flushPromises(); + + const pushed = mockStream.push.mock.calls[0][0] as Record; + expect((pushed.error as Record).message).toContain("Internal JSON-RPC error"); + }); + }); + + describe("notification handling", () => { + it("does not push a response for notifications (no id)", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue(undefined))); + + capturedWrite(makeNotification(), "utf-8"); + await flushPromises(); + + expect(mockStream.push).not.toHaveBeenCalled(); + }); + + it("does not push an error response for notification failures", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("notification error")))); + + capturedWrite(makeNotification(), "utf-8"); + await flushPromises(); + + expect(mockStream.push).not.toHaveBeenCalled(); + }); + }); + + describe("notificationEmitter", () => { + it("registers a notification listener on the emitter", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + expect(emitter.on).toHaveBeenCalledWith("notification", expect.any(Function)); + }); + + it("pushes emitted notifications onto the stream", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + const notification = { jsonrpc: "2.0", method: "eth_subscription", params: { result: "0x1" } }; + handler(notification); + + expect(mockStream.push).toHaveBeenCalledWith(notification); + }); + + it("pushes multiple notifications", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + handler({ type: "a" }); + handler({ type: "b" }); + handler({ type: "c" }); + + expect(mockStream.push).toHaveBeenCalledTimes(3); + }); + + it("registers a close listener to clean up", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + expect(mockStream.once).toHaveBeenCalledWith("close", expect.any(Function)); + }); + + it("removes the notification listener when the stream closes", () => { + const emitter = createMockEmitter(); + setup(createMockEngine(), emitter); + + const notificationHandler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1]; + const closeHandler = mockStream.once.mock.calls.find(([event]) => event === "close")![1] as () => void; + closeHandler(); + + expect(emitter.removeListener).toHaveBeenCalledWith("notification", notificationHandler); + }); + + it("does not interfere with request/response flow", async () => { + const emitter = createMockEmitter(); + const engine = createMockEngine(vi.fn().mockResolvedValue("result")); + setup(engine, emitter); + + const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; + handler({ method: "notif" }); + + capturedWrite(makeRequest({ id: "x" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledTimes(2); + expect(mockStream.push).toHaveBeenCalledWith({ method: "notif" }); + expect(mockStream.push).toHaveBeenCalledWith({ + id: "x", + jsonrpc: "2.0", + result: "result", + }); + }); + }); + + describe("without notificationEmitter", () => { + it("does not register a close listener on the stream", () => { + setup(createMockEngine()); + expect(mockStream.once).not.toHaveBeenCalled(); + }); + + it("handles requests normally", async () => { + setup(createMockEngine(vi.fn().mockResolvedValue("no-emitter"))); + + capturedWrite(makeRequest({ id: "1" }), "utf-8"); + await flushPromises(); + + expect(mockStream.push).toHaveBeenCalledWith({ + id: "1", + jsonrpc: "2.0", + result: "no-emitter", + }); + }); + }); +}); diff --git a/test/v2/providerUtils.test.ts b/test/v2/providerUtils.test.ts index 1a8ee563..22dc6ed1 100644 --- a/test/v2/providerUtils.test.ts +++ b/test/v2/providerUtils.test.ts @@ -137,7 +137,7 @@ describe("providerUtils", () => { expect(error).toBe(testError); expect(response.id).toBe("42"); expect(response.jsonrpc).toBe("2.0"); - expect(response.error).toBe(testError); + expect(response.error).toMatchObject({ message: "send error" }); }); it("constructs a proper JRPCResponse shape", async () => { @@ -152,6 +152,64 @@ describe("providerUtils", () => { expect(response).toHaveProperty("jsonrpc", "2.0"); expect(response).toHaveProperty("result", "hello"); }); + + it("does not reject if the success callback throws", async () => { + const engine = JRPCEngineV2.create({ middleware: [makeRequestMiddleware()] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + + provider.send(makeRequest(), () => { + throw new Error("callback boom"); + }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + process.off("unhandledRejection", unhandled); + expect(unhandled).toHaveBeenCalledTimes(1); + }); + + it("does not reject if the error callback throws", async () => { + const middleware: JRPCMiddlewareV2 = () => { + throw new Error("engine error"); + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + + provider.send(makeRequest(), () => { + throw new Error("callback boom"); + }); + await new Promise((resolve) => setTimeout(resolve, 50)); + + process.off("unhandledRejection", unhandled); + expect(unhandled).toHaveBeenCalledTimes(1); + }); + + it.each([ + { label: "string", thrown: "string error", expectedCause: "string error" }, + { label: "null", thrown: null, expectedCause: null }, + { label: "undefined", thrown: undefined, expectedCause: null }, + { label: "number", thrown: 42, expectedCause: 42 }, + ])("serializes non-Error throw ($label) via send callback", async ({ thrown, expectedCause }) => { + const middleware: JRPCMiddlewareV2 = () => { + throw thrown; + }; + const engine = JRPCEngineV2.create({ middleware: [middleware] }); + const provider = providerFromEngine(engine as JRPCEngineV2); + + const { error, response } = await sendPromise(provider, makeRequest({ id: "x" })); + + expect(error).toBe(thrown); + expect(response.id).toBe("x"); + expect(response.jsonrpc).toBe("2.0"); + const rpcError = response.error as Record; + expect(rpcError).toBeDefined(); + expect(rpcError).toHaveProperty("code"); + expect((rpcError.data as Record).cause).toStrictEqual(expectedCause); + }); }); describe("request", () => { @@ -181,8 +239,7 @@ describe("providerUtils", () => { params: [1, 2], jsonrpc: "2.0", }); - expect(typeof observedRequests[0].id).toBe("string"); - expect((observedRequests[0].id as string).length).toBeGreaterThan(0); + expect(typeof observedRequests[0].id).toBe("number"); }); it("passes params through to the engine", async () => { @@ -368,5 +425,28 @@ describe("providerUtils", () => { expect(result).toBe("full-circle"); }); + + it("deep-clones the request so frozen nested params are isolated from the original", async () => { + let receivedParams: unknown; + const innerMiddleware: JRPCMiddlewareV2, Json> = ({ request }) => { + receivedParams = request.params; + return request.params; + }; + const innerEngine = JRPCEngineV2.create({ middleware: [innerMiddleware] }); + const provider = providerFromEngine(innerEngine as JRPCEngineV2); + + const outerEngine = JRPCEngineV2.create({ + middleware: [providerAsMiddleware(provider)], + }); + + const originalParams = [{ nested: "value" }]; + const req = makeRequest({ params: originalParams }); + + await outerEngine.handle(req); + + expect(receivedParams).toStrictEqual(originalParams); + expect(receivedParams).not.toBe(originalParams); + expect((receivedParams as Record[])[0]).not.toBe(originalParams[0]); + }); }); }); From 74a9c0fd7f279615a60a3a60139409a0e8759f8b Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 20:01:17 +0800 Subject: [PATCH 4/6] fix: added missing callback after stream push --- src/jrpc/v2/messageStream.ts | 4 +++- test/v2/messageStream.test.ts | 41 ++++++++++++++++++++++------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/jrpc/v2/messageStream.ts b/src/jrpc/v2/messageStream.ts index 545f66a7..a4744249 100644 --- a/src/jrpc/v2/messageStream.ts +++ b/src/jrpc/v2/messageStream.ts @@ -21,7 +21,7 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: // noop } - function write(req: JRPCRequest, _encoding: BufferEncoding) { + function write(req: JRPCRequest, _encoding: BufferEncoding, cb: (error?: Error | null) => void) { engine .handle(req) .then((res: Json | void): void => { @@ -32,6 +32,7 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: result: res, }); } + cb(); return undefined; }) .catch((err: unknown) => { @@ -44,6 +45,7 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: }); } log.error(err); + cb(err as Error | null); }); } diff --git a/test/v2/messageStream.test.ts b/test/v2/messageStream.test.ts index 6ad4f150..930757ca 100644 --- a/test/v2/messageStream.test.ts +++ b/test/v2/messageStream.test.ts @@ -28,7 +28,7 @@ function makeNotification(overrides: Record = {}) { describe("createEngineStreamV2", () => { let mockStream: Record; - let capturedWrite: (req: Record, encoding: string) => void; + let capturedWrite: (req: Record, encoding: string, cb: (error?: Error | null) => void) => void; function createMockEngine(handleImpl: Mock = vi.fn().mockResolvedValue(null)) { return { handle: handleImpl } as unknown as JRPCEngineV2; @@ -75,15 +75,26 @@ describe("createEngineStreamV2", () => { expect(result).toHaveProperty("once", mockStream.once); }); + it("invokes the write callback synchronously to signal backpressure release", () => { + setup(createMockEngine()); + + const cb = vi.fn(); + capturedWrite(makeRequest(), "utf-8", cb); + + expect(cb).toHaveBeenCalledOnce(); + }); + describe("request handling", () => { it("pushes a success response for a request", async () => { const engine = createMockEngine(vi.fn().mockResolvedValue("hello")); setup(engine); - capturedWrite(makeRequest({ id: "42" }), "utf-8"); + const cb = vi.fn(); + capturedWrite(makeRequest({ id: "42" }), "utf-8", cb); await flushPromises(); expect(engine.handle).toHaveBeenCalledOnce(); + expect(cb).toHaveBeenCalledOnce(); expect(mockStream.push).toHaveBeenCalledWith({ id: "42", jsonrpc: "2.0", @@ -94,7 +105,7 @@ describe("createEngineStreamV2", () => { it("pushes a null result when engine returns null", async () => { setup(createMockEngine()); - capturedWrite(makeRequest({ id: "1" }), "utf-8"); + capturedWrite(makeRequest({ id: "1" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledWith({ @@ -107,7 +118,7 @@ describe("createEngineStreamV2", () => { it("pushes complex result objects", async () => { setup(createMockEngine(vi.fn().mockResolvedValue({ data: [1, 2, 3] }))); - capturedWrite(makeRequest({ id: "99" }), "utf-8"); + capturedWrite(makeRequest({ id: "99" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledWith({ @@ -120,7 +131,7 @@ describe("createEngineStreamV2", () => { it("preserves the request id in the response", async () => { setup(createMockEngine(vi.fn().mockResolvedValue("ok"))); - capturedWrite(makeRequest({ id: "abc-123" }), "utf-8"); + capturedWrite(makeRequest({ id: "abc-123" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledWith(expect.objectContaining({ id: "abc-123" })); @@ -130,9 +141,9 @@ describe("createEngineStreamV2", () => { let counter = 0; setup(createMockEngine(vi.fn().mockImplementation(() => Promise.resolve(++counter)))); - capturedWrite(makeRequest({ id: "a" }), "utf-8"); - capturedWrite(makeRequest({ id: "b" }), "utf-8"); - capturedWrite(makeRequest({ id: "c" }), "utf-8"); + capturedWrite(makeRequest({ id: "a" }), "utf-8", vi.fn()); + capturedWrite(makeRequest({ id: "b" }), "utf-8", vi.fn()); + capturedWrite(makeRequest({ id: "c" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledTimes(3); @@ -146,7 +157,7 @@ describe("createEngineStreamV2", () => { it("pushes an error response when the engine rejects", async () => { setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure")))); - capturedWrite(makeRequest({ id: "err-1" }), "utf-8"); + capturedWrite(makeRequest({ id: "err-1" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledExactlyOnceWith( @@ -161,7 +172,7 @@ describe("createEngineStreamV2", () => { it("includes the error message in the error response", async () => { setup(createMockEngine(vi.fn().mockRejectedValue(new Error("specific failure")))); - capturedWrite(makeRequest({ id: "err-2" }), "utf-8"); + capturedWrite(makeRequest({ id: "err-2" }), "utf-8", vi.fn()); await flushPromises(); const pushed = mockStream.push.mock.calls[0][0] as Record; @@ -171,7 +182,7 @@ describe("createEngineStreamV2", () => { it("uses a fallback message for non-Error throws", async () => { setup(createMockEngine(vi.fn().mockRejectedValue("string error"))); - capturedWrite(makeRequest({ id: "err-3" }), "utf-8"); + capturedWrite(makeRequest({ id: "err-3" }), "utf-8", vi.fn()); await flushPromises(); const pushed = mockStream.push.mock.calls[0][0] as Record; @@ -183,7 +194,7 @@ describe("createEngineStreamV2", () => { it("does not push a response for notifications (no id)", async () => { setup(createMockEngine(vi.fn().mockResolvedValue(undefined))); - capturedWrite(makeNotification(), "utf-8"); + capturedWrite(makeNotification(), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).not.toHaveBeenCalled(); @@ -192,7 +203,7 @@ describe("createEngineStreamV2", () => { it("does not push an error response for notification failures", async () => { setup(createMockEngine(vi.fn().mockRejectedValue(new Error("notification error")))); - capturedWrite(makeNotification(), "utf-8"); + capturedWrite(makeNotification(), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).not.toHaveBeenCalled(); @@ -256,7 +267,7 @@ describe("createEngineStreamV2", () => { const handler = vi.mocked(emitter.on).mock.calls.find(([event]) => event === "notification")![1] as (msg: unknown) => void; handler({ method: "notif" }); - capturedWrite(makeRequest({ id: "x" }), "utf-8"); + capturedWrite(makeRequest({ id: "x" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledTimes(2); @@ -278,7 +289,7 @@ describe("createEngineStreamV2", () => { it("handles requests normally", async () => { setup(createMockEngine(vi.fn().mockResolvedValue("no-emitter"))); - capturedWrite(makeRequest({ id: "1" }), "utf-8"); + capturedWrite(makeRequest({ id: "1" }), "utf-8", vi.fn()); await flushPromises(); expect(mockStream.push).toHaveBeenCalledWith({ From 964e3f2adfc16f5db807160a20b564500ec31fb8 Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 20:28:38 +0800 Subject: [PATCH 5/6] feat: added util func to propagate the forzen request to mutable one --- src/jrpc/v2/compatibility-utils.ts | 17 ++++++ src/jrpc/v2/index.ts | 2 +- test/v2/compatibility-utils.test.ts | 84 ++++++++++++++++++++++++++++- 3 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/jrpc/v2/compatibility-utils.ts b/src/jrpc/v2/compatibility-utils.ts index b4e289b3..0a00c809 100644 --- a/src/jrpc/v2/compatibility-utils.ts +++ b/src/jrpc/v2/compatibility-utils.ts @@ -97,6 +97,23 @@ export function propagateToRequest(req: Record, context: Middle }); } +/** + * Copies non-JSON-RPC string properties from the context to the request. + * + * Clone the original request object and propagate the context to the cloned request. + * + * **ATTN:** Only string properties are copied. + * + * @param req - The request to propagate the context to. + * @param context - The context to propagate from. + * @returns The mutable cloned request. + */ +export function propagateToMutableRequest(req: Record, context: MiddlewareContext): Record { + const clonedRequest = deepClone(req); + propagateToRequest(clonedRequest, context); + return clonedRequest; +} + /** * Deserialize the error property for a thrown error, merging in the cause where possible. * diff --git a/src/jrpc/v2/index.ts b/src/jrpc/v2/index.ts index b55115b8..0c6c8460 100644 --- a/src/jrpc/v2/index.ts +++ b/src/jrpc/v2/index.ts @@ -1,6 +1,6 @@ export { getUniqueId, isNotification, isRequest } from "../../utils/jrpc"; export { asLegacyMiddleware } from "./asLegacyMiddleware"; -export { deepClone, fromLegacyRequest, makeContext, propagateToContext, propagateToRequest } from "./compatibility-utils"; +export { deepClone, fromLegacyRequest, makeContext, propagateToContext, propagateToMutableRequest, propagateToRequest } from "./compatibility-utils"; export { createScaffoldMiddleware as createScaffoldMiddlewareV2 } from "./createScaffoldMiddleware"; export { JRPCEngineV2 } from "./jrpcEngineV2"; export { JRPCServer } from "./jrpcServer"; diff --git a/test/v2/compatibility-utils.test.ts b/test/v2/compatibility-utils.test.ts index 48f1483d..f9931e7c 100644 --- a/test/v2/compatibility-utils.test.ts +++ b/test/v2/compatibility-utils.test.ts @@ -1,7 +1,6 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -import { Json, JRPCParams, JRPCRequest } from "../../src"; -import { stringify } from "../../src/utils/jrpc"; +import { JRPCParams, JRPCRequest, Json } from "../../src"; import { JsonRpcError } from "../../src/jrpc/errors"; import { deepClone, @@ -9,9 +8,11 @@ import { fromLegacyRequest, makeContext, propagateToContext, + propagateToMutableRequest, propagateToRequest, } from "../../src/jrpc/v2/compatibility-utils"; import { MiddlewareContext } from "../../src/jrpc/v2/MiddlewareContext"; +import { stringify } from "../../src/utils/jrpc"; const jsonrpc = "2.0" as const; @@ -340,6 +341,85 @@ describe("compatibility-utils", () => { }); }); + describe("propagateToMutableRequest", () => { + it("returns a mutable object from a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1, 2, 3]), + id: 1, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + + const result = propagateToMutableRequest(request, context); + + expect(Object.isFrozen(result)).toBe(false); + result.newKey = "can assign"; + expect(result.newKey).toBe("can assign"); + }); + + it("does not mutate the original frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1, 2, 3]), + id: 1, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + + propagateToMutableRequest(request, context); + + expect(request).toStrictEqual({ + jsonrpc, + method: "test_method", + params: [1, 2, 3], + id: 1, + }); + expect("extraProp" in request).toBe(false); + }); + + it("propagates context properties onto the mutable clone of a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([1]), + id: 42, + }); + const context = new MiddlewareContext>(); + context.set("extraProp", "value"); + context.set("anotherProp", { nested: true }); + + const result = propagateToMutableRequest(request, context); + + expect(result).toStrictEqual({ + jsonrpc, + method: "test_method", + params: [1], + id: 42, + extraProp: "value", + anotherProp: { nested: true }, + }); + }); + + it("produces deeply mutable params from a frozen request", () => { + const request = Object.freeze({ + jsonrpc, + method: "test_method", + params: Object.freeze([Object.freeze({ a: 1 }), 2]), + id: 1, + }); + const context = new MiddlewareContext(); + + const result = propagateToMutableRequest(request, context); + + expect(Object.isFrozen(result.params)).toBe(false); + (result.params as unknown[])[0] = "replaced"; + expect((result.params as unknown[])[0]).toBe("replaced"); + }); + }); + describe("deserializeError", () => { // Requires some special handling due to the possible existence or // non-existence of Error.isError From b01101ba9bb60a1241900d7e80ad11b6f0c60c49 Mon Sep 17 00:00:00 2001 From: lwin Date: Thu, 26 Feb 2026 21:06:07 +0800 Subject: [PATCH 6/6] fix: fixed stream write callback --- src/jrpc/v2/messageStream.ts | 16 ++++++++++------ src/jrpc/v2/providerUtils.ts | 24 ++++++++++++++---------- test/v2/messageStream.test.ts | 10 ++++++++++ test/v2/providerUtils.test.ts | 11 ++++++----- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/jrpc/v2/messageStream.ts b/src/jrpc/v2/messageStream.ts index a4744249..296e96a5 100644 --- a/src/jrpc/v2/messageStream.ts +++ b/src/jrpc/v2/messageStream.ts @@ -3,7 +3,7 @@ import { Duplex } from "readable-stream"; import { isRequest } from "../../utils/jrpc"; import { rpcErrors } from "../errors"; -import { JRPCRequest, Json } from "../interfaces"; +import { JRPCRequest } from "../interfaces"; import { SafeEventEmitter } from "../safeEventEmitter"; import { JRPCEngineV2 } from "./jrpcEngineV2"; @@ -21,10 +21,10 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: // noop } - function write(req: JRPCRequest, _encoding: BufferEncoding, cb: (error?: Error | null) => void) { - engine + function handleRequest(req: JRPCRequest) { + return engine .handle(req) - .then((res: Json | void): void => { + .then((res): undefined => { if (res !== undefined && isRequest(req)) { stream?.push({ id: req.id, @@ -32,7 +32,6 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: result: res, }); } - cb(); return undefined; }) .catch((err: unknown) => { @@ -45,10 +44,15 @@ export function createEngineStreamV2({ engine, notificationEmitter }: { engine: }); } log.error(err); - cb(err as Error | null); }); } + function write(req: JRPCRequest, _encoding: BufferEncoding, cb: (error?: Error | null) => void) { + return handleRequest(req).finally(() => { + cb(); + }); + } + stream = new Duplex({ objectMode: true, read: noop, write }); if (notificationEmitter) { diff --git a/src/jrpc/v2/providerUtils.ts b/src/jrpc/v2/providerUtils.ts index 85ed4036..6e7b489e 100644 --- a/src/jrpc/v2/providerUtils.ts +++ b/src/jrpc/v2/providerUtils.ts @@ -25,20 +25,24 @@ export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvid return result as U; }; + async function handleWithCallback(req: JRPCRequest, callback: (error: unknown, providerRes: JRPCResponse) => void) { + try { + const result = await engine.handle(req as JRPCRequest); + callback(null, { id: req.id, jsonrpc: "2.0", result: result as U }); + } catch (error) { + const serializedError = serializeJrpcError(error, { + shouldIncludeStack: false, + shouldPreserveMessage: true, + }); + callback(serializedError, { id: req.id, jsonrpc: "2.0", error: serializedError }); + } + } + provider.send = (req: JRPCRequest, callback: (error: unknown, providerRes: JRPCResponse) => void) => { if (typeof callback !== "function") { throw new Error('Must provide callback to "send" method.'); } - engine - .handle(req as JRPCRequest) - .then((result) => callback(null, { id: req.id, jsonrpc: "2.0", result: result as U })) - .catch((error) => { - const serializedError = serializeJrpcError(error, { - shouldIncludeStack: false, - shouldPreserveMessage: true, - }); - callback(error, { id: req.id, jsonrpc: "2.0", error: serializedError }); - }); + handleWithCallback(req, callback); }; provider.request = async (args: RequestArguments) => { diff --git a/test/v2/messageStream.test.ts b/test/v2/messageStream.test.ts index 5495cb9f..ddbef89e 100644 --- a/test/v2/messageStream.test.ts +++ b/test/v2/messageStream.test.ts @@ -180,6 +180,16 @@ describe("createEngineStreamV2", () => { expect((pushed.error as Record).message).toContain("specific failure"); }); + it("calls cb() without an error to keep the stream alive", async () => { + setup(createMockEngine(vi.fn().mockRejectedValue(new Error("engine failure")))); + + const cb = vi.fn(); + capturedWrite(makeRequest({ id: "err-alive" }), "utf-8", cb); + await flushPromises(); + + expect(cb).toHaveBeenCalledOnce(); + }); + it("uses a fallback message for non-Error throws", async () => { setup(createMockEngine(vi.fn().mockRejectedValue("string error"))); diff --git a/test/v2/providerUtils.test.ts b/test/v2/providerUtils.test.ts index 22dc6ed1..5e8fc3aa 100644 --- a/test/v2/providerUtils.test.ts +++ b/test/v2/providerUtils.test.ts @@ -134,10 +134,10 @@ describe("providerUtils", () => { const req = makeRequest({ id: "42" }); const { error, response } = await sendPromise(provider, req); - expect(error).toBe(testError); + expect(error).toMatchObject({ code: -32603, message: "send error" }); + expect(error).toBe(response.error); expect(response.id).toBe("42"); expect(response.jsonrpc).toBe("2.0"); - expect(response.error).toMatchObject({ message: "send error" }); }); it("constructs a proper JRPCResponse shape", async () => { @@ -202,7 +202,7 @@ describe("providerUtils", () => { const { error, response } = await sendPromise(provider, makeRequest({ id: "x" })); - expect(error).toBe(thrown); + expect(error).toBe(response.error); expect(response.id).toBe("x"); expect(response.jsonrpc).toBe("2.0"); const rpcError = response.error as Record; @@ -323,9 +323,10 @@ describe("providerUtils", () => { }; const provider = providerFromMiddleware(middleware as JRPCMiddlewareV2); - const { error } = await sendPromise(provider, makeRequest()); + const { error, response } = await sendPromise(provider, makeRequest()); - expect(error).toBe(testError); + expect(error).toMatchObject({ code: -32603, message: "middleware-error" }); + expect(error).toBe(response.error); }); it("works with a middleware that uses context", async () => {