Skip to content

ACP events processed out of order with multiple JSON-RPC messages in a chunk #143

@anvilpete

Description

@anvilpete

Summary

ACP events processed out of order with multiple JSON-RPC messages in a chunk

Description

Further to the now fixed #128, there's still a problem when two JSON-RPC messages are received in the same chunk in ndJsonStream.

  • ndJsonStream enqueues NewSession response.
  • ndJsonStream immediately enqueues SessionNotification from the same chunk.
  • acp receive processes response.
  • handleResponse resolves the pending request promise.
  • The caller’s newSession(...).then(...) is scheduled, not run immediately.
  • receive processes the already-enqueued notification.
  • sessionUpdate pushes "SessionNotification".
  • The caller continuation finally pushes "NewSessionResponse".

#130 doesn't fix this because ndJsonStream enqueues both lines together, so there's no opportunity for the NewSession request promise to resolve before the SessionNotification is handled.

To reproduce, add the following test to act.test.ts:

  it("processes notification after response when both arrive in the same chunk", async () => {
    const events: string[] = [];
    const {
      promise: sessionNotification,
      resolve: resolveSessionNotification,
    } = Promise.withResolvers<void>();

    class TestClient implements Client {
      async writeTextFile(
        _: WriteTextFileRequest,
      ): Promise<WriteTextFileResponse> {
        return {};
      }
      async readTextFile(
        _: ReadTextFileRequest,
      ): Promise<ReadTextFileResponse> {
        return { content: "test" };
      }
      async requestPermission(
        _: RequestPermissionRequest,
      ): Promise<RequestPermissionResponse> {
        return {
          outcome: {
            outcome: "selected",
            optionId: "allow",
          },
        };
      }
      async sessionUpdate(_params: SessionNotification): Promise<void> {
        // Record the session notification
        events.push("SessionNotification");
        resolveSessionNotification();
      }
    }

    const connection = new ClientSideConnection(
      () => new TestClient(),
      ndJsonStream(clientToAgent.writable, agentToClient.readable),
    );

    const newSessionResponse = connection
      .newSession({ cwd: "/test", mcpServers: [] })
      .then((result) => {
        // Record the new session response event
        events.push("NewSessionResponse");
        return result;
      });

    // Get the NewSessionRequest ID
    const requestReader = clientToAgent.readable.getReader();
    const { value: requestChunk } = await requestReader.read();
    requestReader.releaseLock();
    const { id: requestId } = JSON.parse(
      new TextDecoder().decode(requestChunk),
    );

    const sessionId = "test-session";
    const writer = agentToClient.writable.getWriter();
    await writer.write(
      new TextEncoder().encode(
        JSON.stringify({
          jsonrpc: "2.0",
          id: requestId,
          result: { sessionId },
        }) +
          "\n" +
          JSON.stringify({
            jsonrpc: "2.0",
            method: "session/update",
            params: {
              sessionId,
              update: {
                sessionUpdate: "available_commands_update",
                availableCommands: [],
              },
            },
          }) +
          "\n",
      ),
    );
    writer.releaseLock();

    await newSessionResponse;
    await sessionNotification;

    expect(events).toEqual(["NewSessionResponse", "SessionNotification"]);
  });

Expected Behavior:
NewSessionResponse is processed before SessionNotification.

Actual Behavior:
SessionNotification is processed out of order, before NewSessionResponse.

ACP Version

0.21.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions