Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions src/jrpc/v2/compatibility-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,23 @@ export function propagateToRequest(req: Record<string, unknown>, context: Middle
});
}

/**
* Copies non-JSON-RPC string properties from the context to the request.
*
* Clone the original request object and propagate the context to the cloned request.
*
* **ATTN:** Only string properties are copied.
*
* @param req - The request to propagate the context to.
* @param context - The context to propagate from.
* @returns The mutable cloned request.
*/
export function propagateToMutableRequest(req: Record<string, unknown>, context: MiddlewareContext): Record<string, unknown> {
const clonedRequest = deepClone(req);
propagateToRequest(clonedRequest, context);
return clonedRequest;
}

/**
* Deserialize the error property for a thrown error, merging in the cause where possible.
*
Expand Down
7 changes: 7 additions & 0 deletions src/jrpc/v2/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
export { getUniqueId, isNotification, isRequest } from "../../utils/jrpc";
export { asLegacyMiddleware } from "./asLegacyMiddleware";
export { deepClone, fromLegacyRequest, makeContext, propagateToContext, propagateToMutableRequest, propagateToRequest } from "./compatibility-utils";
export { createScaffoldMiddleware as createScaffoldMiddlewareV2 } from "./createScaffoldMiddleware";
export { JRPCEngineV2 } from "./jrpcEngineV2";
export { JRPCServer } from "./jrpcServer";
export { createEngineStreamV2 } from "./messageStream";
export { MiddlewareContext } from "./MiddlewareContext";
export {
providerAsMiddleware as providerAsMiddlewareV2,
providerFromEngine as providerFromEngineV2,
providerFromMiddleware as providerFromMiddlewareV2,
} from "./providerUtils";
export type {
ContextConstraint,
EmptyContext,
Expand Down
70 changes: 70 additions & 0 deletions src/jrpc/v2/messageStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import log from "loglevel";
import { Duplex } from "readable-stream";

import { isRequest } from "../../utils/jrpc";
import { rpcErrors } from "../errors";
import { JRPCRequest } from "../interfaces";
import { SafeEventEmitter } from "../safeEventEmitter";
import { JRPCEngineV2 } from "./jrpcEngineV2";

/**
* Creates a Duplex object stream for an engine (JRPCEngineV2) + a separate notification emitter.
*
* Replaces V1's createEngineStream by decoupling notification forwarding from
* the engine itself. Notifications are routed through a SafeEventEmitter that
* pushes onto the same stream, so the engine no longer needs to be an EventEmitter.
*/
export function createEngineStreamV2({ engine, notificationEmitter }: { engine: JRPCEngineV2; notificationEmitter?: SafeEventEmitter }): Duplex {
let stream: Duplex | undefined = undefined;

function noop() {
// noop
}

function handleRequest(req: JRPCRequest<unknown>) {
return engine
.handle(req)
.then((res): undefined => {
if (res !== undefined && isRequest(req)) {
stream?.push({
id: req.id,
jsonrpc: "2.0",
result: res,
});
}
return undefined;
})
.catch((err: unknown) => {
if (isRequest(req)) {
const message = err instanceof Error ? err.message : "Internal JSON-RPC error";
stream?.push({
id: req.id,
jsonrpc: "2.0",
error: rpcErrors.internal({ message }),
});
}
log.error(err);
});
}

function write(req: JRPCRequest<unknown>, _encoding: BufferEncoding, cb: (error?: Error | null) => void) {
return handleRequest(req).finally(() => {
cb();
});
}

stream = new Duplex({ objectMode: true, read: noop, write });

if (notificationEmitter) {
const onNotification = (message: unknown) => {
stream?.push(message);
};

notificationEmitter.on("notification", onNotification);
stream?.once("close", () => {
notificationEmitter.removeListener("notification", onNotification);
});
}

return stream;
}
85 changes: 85 additions & 0 deletions src/jrpc/v2/providerUtils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { getUniqueId } from "../../utils";
import { serializeJrpcError } from "../errors";
import { JRPCParams, JRPCRequest, JRPCResponse, Json, RequestArguments } from "../interfaces";
import { ProviderEvents, SafeEventEmitterProvider } from "../jrpcEngine";
import { SafeEventEmitter } from "../safeEventEmitter";
import { deepClone, propagateToRequest } from "./compatibility-utils";
import { JRPCEngineV2 } from "./jrpcEngineV2";
import type { JRPCMiddlewareV2 } from "./v2interfaces";

/**
* Create a {@link SafeEventEmitterProvider} from a {@link JRPCEngineV2}.
*
* Unlike the V1 counterpart, the V2 engine throws errors directly rather than
* wrapping them in response objects, so `sendAsync` simply propagates thrown errors.
* Notification forwarding is not supported since {@link JRPCEngineV2} is not an event emitter.
*
* @param engine - The V2 JSON-RPC engine.
* @returns A provider backed by the engine.
*/
export function providerFromEngine(engine: JRPCEngineV2): SafeEventEmitterProvider {
const provider: SafeEventEmitterProvider = new SafeEventEmitter<ProviderEvents>() as SafeEventEmitterProvider;

provider.sendAsync = async <T extends JRPCParams, U>(req: JRPCRequest<T>) => {
const result = await engine.handle(req as JRPCRequest);
return result as U;
};

async function handleWithCallback<T extends JRPCParams, U>(req: JRPCRequest<T>, callback: (error: unknown, providerRes: JRPCResponse<U>) => void) {
try {
const result = await engine.handle(req as JRPCRequest);
callback(null, { id: req.id, jsonrpc: "2.0", result: result as U });
} catch (error) {
const serializedError = serializeJrpcError(error, {
shouldIncludeStack: false,
shouldPreserveMessage: true,
});
callback(serializedError, { id: req.id, jsonrpc: "2.0", error: serializedError });
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback invoked twice when success callback throws

High Severity

handleWithCallback places both engine.handle() and the success callback(null, ...) call inside the same try block. If the callback itself throws after being invoked with a successful result, the catch block catches that error and calls the callback a second time with a serialized error response. This means the callback can be invoked twice — once with a success response and once with an error — violating the standard Node.js convention that callbacks are called exactly once. The V1 provider.send avoids this by passing the callback directly to the engine.

Fix in Cursor Fix in Web


provider.send = <T extends JRPCParams, U>(req: JRPCRequest<T>, callback: (error: unknown, providerRes: JRPCResponse<U>) => void) => {
if (typeof callback !== "function") {
throw new Error('Must provide callback to "send" method.');
}
handleWithCallback(req, callback);
};

provider.request = async <T extends JRPCParams, U>(args: RequestArguments<T>) => {
const req: JRPCRequest<JRPCParams> = {
...args,
id: getUniqueId(),
jsonrpc: "2.0",
};
const res = await provider.sendAsync(req);
return res as U;
};

return provider;
}

/**
* Create a {@link SafeEventEmitterProvider} from one or more V2 middleware.
*
* @param middleware - The V2 middleware to back the provider.
* @returns A provider backed by an engine composed of the given middleware.
*/
export function providerFromMiddleware(middleware: JRPCMiddlewareV2): SafeEventEmitterProvider {
const engine = JRPCEngineV2.create({ middleware: [middleware] });
return providerFromEngine(engine as JRPCEngineV2);
}

/**
* Convert a {@link SafeEventEmitterProvider} into a V2 middleware.
* The middleware delegates all requests to the provider's `sendAsync` method.
*
* @param provider - The provider to wrap as middleware.
* @returns A V2 middleware that forwards requests to the provider.
*/
export function providerAsMiddleware(provider: SafeEventEmitterProvider): JRPCMiddlewareV2<JRPCRequest, Json> {
return async ({ request, context }) => {
const providerRequest = deepClone(request);
propagateToRequest(providerRequest, context);
return (await provider.sendAsync(providerRequest)) as Json;
};
}
Loading
Loading