Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion openai-agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Each scenario's README describes how to start its Worker and run its scenarios b
| [`multi-agent`](./src/multi-agent) | A planner agent fans out concurrent web searches and a writer agent synthesizes a final report. |
| [`stateful-conversation`](./src/stateful-conversation) | A long-running, multi-turn Workflow driven by Updates and Queries, with triage handoffs and `continueAsNew` to bound history. |
| [`nexus-tools`](./src/nexus-tools) | Expose a [Nexus](https://docs.temporal.io/nexus) Operation as an agent tool with `nexusOperationAsTool`. |
| [`streaming`](./src/streaming) | Run an agent in streaming mode over a Workflow Stream and subscribe to the model stream events live from an external client. |

## Feature support

Expand All @@ -58,5 +59,5 @@ Any OpenAI Agents SDK `ModelProvider` can drive the model Activity. The provider
| Child Workflows | Supported | Plugin config propagates to children |
| Local Activities | Supported | Set `useLocalActivity: true` in `modelParams` |
| Model override per run | Supported | `runConfig.model` accepts a string model name |
| Streaming | Not supported | Use `runner.run()` |
| Streaming | Supported (experimental) | `run(agent, input, { stream: true })` over a Workflow Stream; see [`streaming`](./src/streaming) |
| Voice agents | Not supported | |
1 change: 1 addition & 0 deletions openai-agents/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"@temporalio/nexus": "^1.18.0",
"@temporalio/openai-agents": "^1.18.0",
"@temporalio/worker": "^1.18.0",
"@temporalio/workflow-streams": "^1.18.0",
"@temporalio/workflow": "^1.18.0",
"@openai/agents-core": "^0.11.6",
"@openai/agents-openai": "^0.11.6",
Expand Down
36 changes: 36 additions & 0 deletions openai-agents/src/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# OpenAI Agents: Streaming

Demonstrates the streaming API of the Temporal OpenAI Agents integration. The Workflow hosts a
[Workflow Stream](https://github.com/temporalio/sdk-typescript/tree/main/contrib/workflow-streams)
and runs an agent in streaming mode with `runner.run(agent, input, { stream: true })`. As the model
responds, the streaming model Activity publishes each raw model stream event to the Workflow Stream
topic, and an external client subscribes to that topic to print the deltas live.

The Workflow (`src/streaming/workflows.ts`) constructs `new WorkflowStream()` at the top, then
iterates the `StreamedRunResult` to drive the run to completion and returns the final text. The
Worker configures `modelParams.streamingTopic` (and a `streamingBatchInterval`) on the
`OpenAIAgentsPlugin` so the streaming Activity knows which topic to publish to. The client
(`src/streaming/client.ts`) subscribes from outside the Workflow with
`WorkflowStreamClient.create(client, workflowId).topic(streamingTopic).subscribe()`.

## Run

Run these from the `openai-agents/` root (run `npm install` there once first).

```bash
# In one terminal, start the Worker (requires a local Temporal server and OPENAI_API_KEY):
OPENAI_API_KEY=sk-... npx ts-node src/streaming/worker.ts

# In another terminal, start the streaming client:
npx ts-node src/streaming/client.ts

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client needs OPENAI_API_KEY too (client.ts: 14-17), prefix with OPENAI_API_KEY=sk-... ?

```

## Test

```bash
npx mocha --exit --require ts-node/register --require source-map-support/register "src/streaming/mocha/*.test.ts"
```

The test runs a real Worker against `TestWorkflowEnvironment` with a scripted streaming fake model,
so no `OPENAI_API_KEY` is required. It asserts that an external `WorkflowStreamClient` subscriber
receives the streamed events in order and that the Workflow completes with the final text.
58 changes: 58 additions & 0 deletions openai-agents/src/streaming/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Connection, Client } from '@temporalio/client';
import { OpenAIAgentsPlugin } from '@temporalio/openai-agents';
import { OpenAIProvider } from '@openai/agents-openai';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import { streamingChat, streamingTopic } from './workflows';
import { nanoid } from 'nanoid';

interface ModelStreamEvent {
type?: string;
delta?: string;
}

async function run() {
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
throw new Error('OPENAI_API_KEY environment variable is required');
}

const connection = await Connection.connect();
const client = new Client({
connection,
plugins: [
new OpenAIAgentsPlugin({
modelProvider: new OpenAIProvider({ apiKey }),
modelParams: { streamingTopic },
}),
],
});

const taskQueue = 'openai-agents-streaming';
const workflowId = 'openai-agents-streaming-' + nanoid();

const handle = await client.workflow.start(streamingChat, {
taskQueue,
workflowId,
args: ['Write a short poem about durable execution.'],
});
console.log(`Started workflow ${handle.workflowId}`);

const streamClient = WorkflowStreamClient.create(client, workflowId);
const subscriber = (async () => {
for await (const item of streamClient.topic<ModelStreamEvent>(streamingTopic).subscribe()) {
if (item.data.type === 'output_text_delta' && item.data.delta) {
process.stdout.write(item.data.delta);
}
}
})();

const result = await handle.result();
await subscriber;
console.log('\n---');
console.log(result);
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
53 changes: 53 additions & 0 deletions openai-agents/src/streaming/mocha/fake-model.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {
type AgentOutputItem,
type Model,
type ModelProvider,
type ModelRequest,
type ModelResponse,
type StreamEvent,
} from '@openai/agents-core';

export function streamingTextEvents(text: string): StreamEvent[] {
const output: AgentOutputItem[] = [
{
type: 'message',
id: 'msg_fake_stream_001',
role: 'assistant',
content: [{ type: 'output_text', text }],
status: 'completed',
},
];
return [
{ type: 'output_text_delta', delta: text },
{
type: 'response_done',
response: {
id: 'resp_fake_stream_001',
usage: { requests: 1, inputTokens: 10, outputTokens: text.length, totalTokens: 10 + text.length },
output,
},
},
] as StreamEvent[];
}

export class StreamingFakeModel implements Model {
constructor(private readonly events: StreamEvent[]) {}
async getResponse(_request: ModelRequest): Promise<ModelResponse> {
throw new Error('StreamingFakeModel only supports getStreamedResponse');
}
async *getStreamedResponse(_request: ModelRequest): AsyncIterable<StreamEvent> {
for (const event of this.events) {
yield event;
}
}
}

export class StreamingFakeModelProvider implements ModelProvider {
private readonly model: StreamingFakeModel;
constructor(events: StreamEvent[]) {
this.model = new StreamingFakeModel(events);
}
getModel(_name?: string): Model {
return this.model;
}
}
100 changes: 100 additions & 0 deletions openai-agents/src/streaming/mocha/workflows.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { after, before, describe, it } from 'mocha';
import { Worker } from '@temporalio/worker';
import { Client } from '@temporalio/client';
import { OpenAIAgentsPlugin } from '@temporalio/openai-agents';
import { WorkflowStreamClient } from '@temporalio/workflow-streams/client';
import assert from 'assert';
import { StreamingFakeModelProvider, streamingTextEvents } from './fake-model';
import { streamingChat, streamingTopic } from '../workflows';

interface ModelStreamEvent {
type?: string;
delta?: string;
}

describe('openai-agents/streaming workflow scenarios', function () {
this.timeout(30_000);

let testEnv: TestWorkflowEnvironment;

before(async () => {
testEnv = await TestWorkflowEnvironment.createLocal();
});

after(async () => {
await testEnv?.teardown();
});

it('streamingChat: external subscriber receives the streamed events in order', async () => {
const taskQueue = 'test-streaming';
const events = streamingTextEvents('Hello streamed world');

const worker = await Worker.create({
connection: testEnv.nativeConnection,
taskQueue,
workflowsPath: require.resolve('../workflows'),
plugins: [
new OpenAIAgentsPlugin({
modelProvider: new StreamingFakeModelProvider(events),
modelParams: { streamingTopic, streamingBatchInterval: '50 milliseconds' },
}),
],
bundlerOptions: {
webpackConfigHook: (config) => ({
...config,
resolve: {
...config.resolve,
conditionNames: ['require', 'browser', 'default'],
},
}),
},
});

// The client carries streamingTopic to the Workflow via the config header.
const client = new Client({
connection: testEnv.connection,
plugins: [
new OpenAIAgentsPlugin({
modelProvider: new StreamingFakeModelProvider(events),
modelParams: { streamingTopic },
}),
],
});

const workflowId = 'test-streaming-' + Date.now();
const result = await worker.runUntil(async () => {
const handle = await client.workflow.start(streamingChat, {
taskQueue,
workflowId,
args: ['Hi'],
});

const received: ModelStreamEvent[] = [];
const streamClient = WorkflowStreamClient.create(client, workflowId);
const gen = streamClient.topic<ModelStreamEvent>(streamingTopic).subscribe(0, { pollCooldown: 0 });
const collect = (async () => {
for await (const item of gen) {
received.push(item.data);
if (received.length >= events.length) {
await gen.return();
break;
}
}
})();

const finalOutput = await handle.result();
await collect;

assert.strictEqual(received.length, events.length);
assert.deepStrictEqual(
received.map((e) => e.type),
events.map((e) => e.type),
);
assert.strictEqual(received[0]!.delta, 'Hello streamed world');
return finalOutput;
});

assert.strictEqual(result, 'Hello streamed world');
});
});
43 changes: 43 additions & 0 deletions openai-agents/src/streaming/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { NativeConnection, Worker } from '@temporalio/worker';
import { OpenAIAgentsPlugin } from '@temporalio/openai-agents';
import { OpenAIProvider } from '@openai/agents-openai';
import { streamingTopic } from './workflows';

async function run() {
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
throw new Error('OPENAI_API_KEY environment variable is required');
}

const connection = await NativeConnection.connect({ address: 'localhost:7233' });
try {
const worker = await Worker.create({
connection,
taskQueue: 'openai-agents-streaming',
workflowsPath: require.resolve('./workflows'),
plugins: [
new OpenAIAgentsPlugin({
modelProvider: new OpenAIProvider({ apiKey }),
modelParams: { streamingTopic, streamingBatchInterval: '200 milliseconds' },

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker is setting streamingBatchInterval here but the client just passes { streamingTopic } - I think that means the interval doesn't actually get set (worker-side plugin modelParams don't enter the sandbox?). Can you double check the examples for this?

}),
],
bundlerOptions: {
webpackConfigHook: (config) => ({
...config,
resolve: {
...config.resolve,
conditionNames: ['require', 'browser', 'default'],
},
}),
},
});
await worker.run();
} finally {
await connection.close();
}
}

run().catch((err) => {
console.error(err);
process.exit(1);
});
17 changes: 17 additions & 0 deletions openai-agents/src/streaming/workflows.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Agent } from '@openai/agents-core';
import { TemporalOpenAIRunner } from '@temporalio/openai-agents/workflow';
import { WorkflowStream } from '@temporalio/workflow-streams/workflow';

export const streamingTopic = 'model-stream';

// @@@SNIPSTART typescript-openai-agents-streaming-workflow
export async function streamingChat(prompt: string): Promise<string> {
new WorkflowStream();
const agent = new Agent({ name: 'StreamingAgent', instructions: 'You are a helpful assistant.' });
const result = await new TemporalOpenAIRunner().run(agent, prompt, { stream: true });
// The external client is the event consumer; the Workflow only drives the run to completion.
for await (const _event of result);
await result.completed;
return result.finalOutput ?? '';
}
// @@@SNIPEND
Loading