Skip to content

Add OpenAI Response WebSocket API#1093

Merged
toubatbrian merged 11 commits intomainfrom
brian/oai-response-ws
Mar 16, 2026
Merged

Add OpenAI Response WebSocket API#1093
toubatbrian merged 11 commits intomainfrom
brian/oai-response-ws

Conversation

@toubatbrian
Copy link
Contributor

@toubatbrian toubatbrian commented Mar 3, 2026

Use with

llm: openai.responses.LLM({ useWebsocket: true })

Fully tested with unit tests and e2e tests

@changeset-bot
Copy link

changeset-bot bot commented Mar 3, 2026

🦋 Changeset detected

Latest commit: 159e88a

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 21 packages
Name Type
@livekit/agents-plugin-openai Patch
@livekit/agents-plugin-anam Patch
@livekit/agents-plugin-cartesia Patch
@livekit/agents-plugin-deepgram Patch
@livekit/agents-plugin-elevenlabs Patch
@livekit/agents-plugin-google Patch
@livekit/agents-plugin-inworld Patch
@livekit/agents-plugin-neuphonic Patch
@livekit/agents-plugin-resemble Patch
@livekit/agents-plugin-rime Patch
@livekit/agents-plugin-sarvam Patch
@livekit/agents-plugin-xai Patch
@livekit/agents Patch
@livekit/agents-plugin-baseten Patch
@livekit/agents-plugin-bey Patch
@livekit/agents-plugin-hedra Patch
@livekit/agents-plugin-lemonslice Patch
@livekit/agents-plugin-livekit Patch
@livekit/agents-plugin-phonic Patch
@livekit/agents-plugin-silero Patch
@livekit/agents-plugins-test Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@toubatbrian toubatbrian requested a review from a team March 3, 2026 01:47
devin-ai-integration[bot]

This comment was marked as resolved.

chatgpt-codex-connector[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

@toubatbrian toubatbrian requested a review from lukasIO March 4, 2026 07:39
@davidzhao davidzhao requested a review from tinalenguyen March 9, 2026 18:04
Copy link
Member

@tinalenguyen tinalenguyen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a small comment, also a few notes:

  • in python we also implement the previous_response_id chaining logic for the http client
  • we should add a check if store==false, in that case we always fall back to sending the entire chat context

devin-ai-integration[bot]

This comment was marked as resolved.

devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Member

@tinalenguyen tinalenguyen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, i tested with the change devin suggested to receive LLM metrics

Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com>
@toubatbrian toubatbrian merged commit 92244c6 into main Mar 16, 2026
7 of 8 checks passed
@toubatbrian toubatbrian deleted the brian/oai-response-ws branch March 16, 2026 23:11
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 17 additional findings in Devin Review.

Open in Devin Review

Comment on lines +50 to +100
constructor(ws: WebSocket) {
this.#ws = ws;

ws.on('message', (data: Buffer) => {
const current = this.#outputQueue[0];
if (!current) return;

let raw: unknown;
try {
raw = JSON.parse(data.toString());
} catch {
return;
}

// Validate and type-narrow with Zod at write time so readers always
// receive a fully-typed WsServerEvent.
const parsed = wsServerEventSchema.safeParse(raw);
if (!parsed.success) return;

const event = parsed.data;
void current.write(event);

// Close and dequeue on any terminal event.
if (
event.type === 'response.completed' ||
event.type === 'response.failed' ||
event.type === 'error'
) {
void current.close();
this.#outputQueue.shift();
}
});

ws.on('close', () => {
// If the WebSocket closes while requests are still in flight, synthesise
// a typed error event so all readers can handle it cleanly.
for (const current of this.#outputQueue) {
if (!current.closed) {
const closeError: WsServerEvent = {
type: 'error',
error: {
code: 'websocket_closed',
message: 'OpenAI Responses WebSocket closed unexpectedly',
},
};
void current.write(closeError).finally(() => current.close());
}
}
this.#outputQueue = [];
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 No persistent error handler on WebSocket after connection — unhandled error can crash process

The ResponsesWebSocket constructor (plugins/openai/src/ws/llm.ts:50-100) only registers message and close handlers on the WebSocket. The connectWs helper registers a once('error') handler (plugins/openai/src/ws/llm.ts:628-637) which catches the first post-connection error (no-op since settled=true) and is then removed. For long-lived connections (up to 60 minutes per WS_MAX_SESSION_DURATION), any subsequent WebSocket error event would have no registered listener. In Node.js, an error event emitted on an EventEmitter with no listeners throws, which can crash the process.

Fix: Add a no-op error handler in the ResponsesWebSocket constructor

Add ws.on('error', () => {}); (or a handler that logs the error) in the ResponsesWebSocket constructor so that all WebSocket error events are caught. The existing close handler already handles cleanup.

Suggested change
constructor(ws: WebSocket) {
this.#ws = ws;
ws.on('message', (data: Buffer) => {
const current = this.#outputQueue[0];
if (!current) return;
let raw: unknown;
try {
raw = JSON.parse(data.toString());
} catch {
return;
}
// Validate and type-narrow with Zod at write time so readers always
// receive a fully-typed WsServerEvent.
const parsed = wsServerEventSchema.safeParse(raw);
if (!parsed.success) return;
const event = parsed.data;
void current.write(event);
// Close and dequeue on any terminal event.
if (
event.type === 'response.completed' ||
event.type === 'response.failed' ||
event.type === 'error'
) {
void current.close();
this.#outputQueue.shift();
}
});
ws.on('close', () => {
// If the WebSocket closes while requests are still in flight, synthesise
// a typed error event so all readers can handle it cleanly.
for (const current of this.#outputQueue) {
if (!current.closed) {
const closeError: WsServerEvent = {
type: 'error',
error: {
code: 'websocket_closed',
message: 'OpenAI Responses WebSocket closed unexpectedly',
},
};
void current.write(closeError).finally(() => current.close());
}
}
this.#outputQueue = [];
});
}
constructor(ws: WebSocket) {
this.#ws = ws;
// Prevent unhandled 'error' events from crashing the process.
// The 'close' handler below takes care of notifying in-flight readers.
ws.on('error', () => {});
ws.on('message', (data: Buffer) => {
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +376 to +385
await this.#pool.withConnection(async (conn: ResponsesWebSocket) => {
const needsRetry = await this.#runWithConn(conn, this.chatCtx, this.#prevResponseId);

if (needsRetry) {
// previous_response_id was evicted from the server-side cache.
// Retry once on the same connection with the full context and no ID.
retryable = true;
await this.#runWithConn(conn, this.#fullChatCtx, undefined);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 ConnectionPool.withConnection removes (closes) the WebSocket on every non-connection error including non-retryable API errors

In WSLLMStream.run() (plugins/openai/src/ws/llm.ts:376), the response round-trip is wrapped in this.#pool.withConnection(...). The ConnectionPool.withConnection implementation (agents/src/connection_pool.ts:287-289) calls this.remove(conn) on any error thrown from the callback. This means that non-transient errors like APIStatusError with retryable: false (thrown by #handleResponseFailed at plugins/openai/src/ws/llm.ts:594 or the general error branch at plugins/openai/src/ws/llm.ts:531) will close and discard the healthy WebSocket connection. Each subsequent chat() call will then pay the cost of establishing a new WebSocket connection, negating the core benefit of the WebSocket pooling approach for scenarios like consecutive invalid-request errors.

Prompt for agents
In plugins/openai/src/ws/llm.ts, the run() method at line 372 uses this.#pool.withConnection() which always removes the connection on error (see agents/src/connection_pool.ts:287-289). For non-transient errors (e.g. APIStatusError with retryable: false from #handleResponseFailed), the WebSocket is still healthy and should be returned to the pool instead of being closed. Consider restructuring to use pool.get()/pool.put() manually instead of withConnection(), so that only connection-related errors (APIConnectionError) cause the connection to be removed, while application-level errors allow the connection to be returned to the pool via pool.put().
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +540 to +542
#handleResponseCreated(event: WsResponseCreatedEvent): void {
this.#responseId = event.response.id;
this.#llm._onResponseCreated(event.response.id, this.#fullChatCtx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 _onResponseCreated stores stale prevResponseId when response fails, leaving #pendingToolCalls from a different response

_onResponseCreated at plugins/openai/src/ws/llm.ts:540-542 updates #prevResponseId and #prevChatCtx as soon as response.created arrives, before the response completes. _setPendingToolCalls is only called in #handleResponseCompleted (plugins/openai/src/ws/llm.ts:577). If the response subsequently fails via response.failed, #prevResponseId is set to the failed response's ID, but #pendingToolCalls still holds stale data from a prior completed response. On the next chat() call (plugins/openai/src/ws/llm.ts:273-296), the diff logic uses #prevResponseId (pointing to the failed response) combined with #pendingToolCalls (from a different, earlier response). The #pendingToolCallsCompleted check at line 288 may incorrectly evaluate, either forcing an unnecessary fallback or (worse) approving an incremental send that omits required tool-call outputs. The retry path handles the former, but the latter could produce a server error requiring an extra round-trip.

Prompt for agents
In plugins/openai/src/ws/llm.ts, the _onResponseCreated (called at line 542) and _setPendingToolCalls (called at line 577) update WSLLM state at different points in the response lifecycle. If a response fails (response.failed), _onResponseCreated has already fired but _setPendingToolCalls has not, leaving stale pendingToolCalls from a prior response paired with the failed response's prevResponseId. To fix: either (1) clear #pendingToolCalls in _onResponseCreated (line 219-222 of WSLLM) so that each new response starts fresh, or (2) reset #prevResponseId and #prevChatCtx in #handleResponseFailed so the next turn does not attempt incremental optimization with a failed response ID.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@github-actions github-actions bot mentioned this pull request Mar 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants