diff --git a/openai-agents/README.md b/openai-agents/README.md index 0adea03a..b504fce9 100644 --- a/openai-agents/README.md +++ b/openai-agents/README.md @@ -33,6 +33,7 @@ Each scenario's README describes how to start its Worker and run its scenarios b | [`multi-agent`](./src/multi-agent) | A planner agent fans out concurrent web searches and a writer agent synthesizes a final report. | | [`stateful-conversation`](./src/stateful-conversation) | A long-running, multi-turn Workflow driven by Updates and Queries, with triage handoffs and `continueAsNew` to bound history. | | [`nexus-tools`](./src/nexus-tools) | Expose a [Nexus](https://docs.temporal.io/nexus) Operation as an agent tool with `nexusOperationAsTool`. | +| [`streaming`](./src/streaming) | Run an agent in streaming mode over a Workflow Stream and subscribe to the model stream events live from an external client. | ## Feature support @@ -58,5 +59,5 @@ Any OpenAI Agents SDK `ModelProvider` can drive the model Activity. The provider | Child Workflows | Supported | Plugin config propagates to children | | Local Activities | Supported | Set `useLocalActivity: true` in `modelParams` | | Model override per run | Supported | `runConfig.model` accepts a string model name | -| Streaming | Not supported | Use `runner.run()` | +| Streaming | Supported (experimental) | `run(agent, input, { stream: true })` over a Workflow Stream; see [`streaming`](./src/streaming) | | Voice agents | Not supported | | diff --git a/openai-agents/package.json b/openai-agents/package.json index 05117461..6d8de08a 100644 --- a/openai-agents/package.json +++ b/openai-agents/package.json @@ -16,6 +16,7 @@ "@temporalio/nexus": "^1.18.0", "@temporalio/openai-agents": "^1.18.0", "@temporalio/worker": "^1.18.0", + "@temporalio/workflow-streams": "^1.18.0", "@temporalio/workflow": "^1.18.0", "@openai/agents-core": "^0.11.6", "@openai/agents-openai": "^0.11.6", diff --git a/openai-agents/src/streaming/README.md b/openai-agents/src/streaming/README.md new file mode 100644 index 00000000..c518ad56 --- /dev/null +++ b/openai-agents/src/streaming/README.md @@ -0,0 +1,36 @@ +# OpenAI Agents: Streaming + +Demonstrates the streaming API of the Temporal OpenAI Agents integration. The Workflow hosts a +[Workflow Stream](https://github.com/temporalio/sdk-typescript/tree/main/contrib/workflow-streams) +and runs an agent in streaming mode with `runner.run(agent, input, { stream: true })`. As the model +responds, the streaming model Activity publishes each raw model stream event to the Workflow Stream +topic, and an external client subscribes to that topic to print the deltas live. + +The Workflow (`src/streaming/workflows.ts`) constructs `new WorkflowStream()` at the top, then +iterates the `StreamedRunResult` to drive the run to completion and returns the final text. The +Worker configures `modelParams.streamingTopic` (and a `streamingBatchInterval`) on the +`OpenAIAgentsPlugin` so the streaming Activity knows which topic to publish to. The client +(`src/streaming/client.ts`) subscribes from outside the Workflow with +`WorkflowStreamClient.create(client, workflowId).topic(streamingTopic).subscribe()`. + +## Run + +Run these from the `openai-agents/` root (run `npm install` there once first). + +```bash +# In one terminal, start the Worker (requires a local Temporal server and OPENAI_API_KEY): +OPENAI_API_KEY=sk-... npx ts-node src/streaming/worker.ts + +# In another terminal, start the streaming client: +npx ts-node src/streaming/client.ts +``` + +## Test + +```bash +npx mocha --exit --require ts-node/register --require source-map-support/register "src/streaming/mocha/*.test.ts" +``` + +The test runs a real Worker against `TestWorkflowEnvironment` with a scripted streaming fake model, +so no `OPENAI_API_KEY` is required. It asserts that an external `WorkflowStreamClient` subscriber +receives the streamed events in order and that the Workflow completes with the final text. diff --git a/openai-agents/src/streaming/client.ts b/openai-agents/src/streaming/client.ts new file mode 100644 index 00000000..737d3ded --- /dev/null +++ b/openai-agents/src/streaming/client.ts @@ -0,0 +1,58 @@ +import { Connection, Client } from '@temporalio/client'; +import { OpenAIAgentsPlugin } from '@temporalio/openai-agents'; +import { OpenAIProvider } from '@openai/agents-openai'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import { streamingChat, streamingTopic } from './workflows'; +import { nanoid } from 'nanoid'; + +interface ModelStreamEvent { + type?: string; + delta?: string; +} + +async function run() { + const apiKey = process.env.OPENAI_API_KEY; + if (!apiKey) { + throw new Error('OPENAI_API_KEY environment variable is required'); + } + + const connection = await Connection.connect(); + const client = new Client({ + connection, + plugins: [ + new OpenAIAgentsPlugin({ + modelProvider: new OpenAIProvider({ apiKey }), + modelParams: { streamingTopic }, + }), + ], + }); + + const taskQueue = 'openai-agents-streaming'; + const workflowId = 'openai-agents-streaming-' + nanoid(); + + const handle = await client.workflow.start(streamingChat, { + taskQueue, + workflowId, + args: ['Write a short poem about durable execution.'], + }); + console.log(`Started workflow ${handle.workflowId}`); + + const streamClient = WorkflowStreamClient.create(client, workflowId); + const subscriber = (async () => { + for await (const item of streamClient.topic(streamingTopic).subscribe()) { + if (item.data.type === 'output_text_delta' && item.data.delta) { + process.stdout.write(item.data.delta); + } + } + })(); + + const result = await handle.result(); + await subscriber; + console.log('\n---'); + console.log(result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/openai-agents/src/streaming/mocha/fake-model.ts b/openai-agents/src/streaming/mocha/fake-model.ts new file mode 100644 index 00000000..a3918505 --- /dev/null +++ b/openai-agents/src/streaming/mocha/fake-model.ts @@ -0,0 +1,53 @@ +import { + type AgentOutputItem, + type Model, + type ModelProvider, + type ModelRequest, + type ModelResponse, + type StreamEvent, +} from '@openai/agents-core'; + +export function streamingTextEvents(text: string): StreamEvent[] { + const output: AgentOutputItem[] = [ + { + type: 'message', + id: 'msg_fake_stream_001', + role: 'assistant', + content: [{ type: 'output_text', text }], + status: 'completed', + }, + ]; + return [ + { type: 'output_text_delta', delta: text }, + { + type: 'response_done', + response: { + id: 'resp_fake_stream_001', + usage: { requests: 1, inputTokens: 10, outputTokens: text.length, totalTokens: 10 + text.length }, + output, + }, + }, + ] as StreamEvent[]; +} + +export class StreamingFakeModel implements Model { + constructor(private readonly events: StreamEvent[]) {} + async getResponse(_request: ModelRequest): Promise { + throw new Error('StreamingFakeModel only supports getStreamedResponse'); + } + async *getStreamedResponse(_request: ModelRequest): AsyncIterable { + for (const event of this.events) { + yield event; + } + } +} + +export class StreamingFakeModelProvider implements ModelProvider { + private readonly model: StreamingFakeModel; + constructor(events: StreamEvent[]) { + this.model = new StreamingFakeModel(events); + } + getModel(_name?: string): Model { + return this.model; + } +} diff --git a/openai-agents/src/streaming/mocha/workflows.test.ts b/openai-agents/src/streaming/mocha/workflows.test.ts new file mode 100644 index 00000000..3148f02f --- /dev/null +++ b/openai-agents/src/streaming/mocha/workflows.test.ts @@ -0,0 +1,100 @@ +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { after, before, describe, it } from 'mocha'; +import { Worker } from '@temporalio/worker'; +import { Client } from '@temporalio/client'; +import { OpenAIAgentsPlugin } from '@temporalio/openai-agents'; +import { WorkflowStreamClient } from '@temporalio/workflow-streams/client'; +import assert from 'assert'; +import { StreamingFakeModelProvider, streamingTextEvents } from './fake-model'; +import { streamingChat, streamingTopic } from '../workflows'; + +interface ModelStreamEvent { + type?: string; + delta?: string; +} + +describe('openai-agents/streaming workflow scenarios', function () { + this.timeout(30_000); + + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createLocal(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + it('streamingChat: external subscriber receives the streamed events in order', async () => { + const taskQueue = 'test-streaming'; + const events = streamingTextEvents('Hello streamed world'); + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue, + workflowsPath: require.resolve('../workflows'), + plugins: [ + new OpenAIAgentsPlugin({ + modelProvider: new StreamingFakeModelProvider(events), + modelParams: { streamingTopic, streamingBatchInterval: '50 milliseconds' }, + }), + ], + bundlerOptions: { + webpackConfigHook: (config) => ({ + ...config, + resolve: { + ...config.resolve, + conditionNames: ['require', 'browser', 'default'], + }, + }), + }, + }); + + // The client carries streamingTopic to the Workflow via the config header. + const client = new Client({ + connection: testEnv.connection, + plugins: [ + new OpenAIAgentsPlugin({ + modelProvider: new StreamingFakeModelProvider(events), + modelParams: { streamingTopic }, + }), + ], + }); + + const workflowId = 'test-streaming-' + Date.now(); + const result = await worker.runUntil(async () => { + const handle = await client.workflow.start(streamingChat, { + taskQueue, + workflowId, + args: ['Hi'], + }); + + const received: ModelStreamEvent[] = []; + const streamClient = WorkflowStreamClient.create(client, workflowId); + const gen = streamClient.topic(streamingTopic).subscribe(0, { pollCooldown: 0 }); + const collect = (async () => { + for await (const item of gen) { + received.push(item.data); + if (received.length >= events.length) { + await gen.return(); + break; + } + } + })(); + + const finalOutput = await handle.result(); + await collect; + + assert.strictEqual(received.length, events.length); + assert.deepStrictEqual( + received.map((e) => e.type), + events.map((e) => e.type), + ); + assert.strictEqual(received[0]!.delta, 'Hello streamed world'); + return finalOutput; + }); + + assert.strictEqual(result, 'Hello streamed world'); + }); +}); diff --git a/openai-agents/src/streaming/worker.ts b/openai-agents/src/streaming/worker.ts new file mode 100644 index 00000000..98c7220b --- /dev/null +++ b/openai-agents/src/streaming/worker.ts @@ -0,0 +1,43 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import { OpenAIAgentsPlugin } from '@temporalio/openai-agents'; +import { OpenAIProvider } from '@openai/agents-openai'; +import { streamingTopic } from './workflows'; + +async function run() { + const apiKey = process.env.OPENAI_API_KEY; + if (!apiKey) { + throw new Error('OPENAI_API_KEY environment variable is required'); + } + + const connection = await NativeConnection.connect({ address: 'localhost:7233' }); + try { + const worker = await Worker.create({ + connection, + taskQueue: 'openai-agents-streaming', + workflowsPath: require.resolve('./workflows'), + plugins: [ + new OpenAIAgentsPlugin({ + modelProvider: new OpenAIProvider({ apiKey }), + modelParams: { streamingTopic, streamingBatchInterval: '200 milliseconds' }, + }), + ], + bundlerOptions: { + webpackConfigHook: (config) => ({ + ...config, + resolve: { + ...config.resolve, + conditionNames: ['require', 'browser', 'default'], + }, + }), + }, + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/openai-agents/src/streaming/workflows.ts b/openai-agents/src/streaming/workflows.ts new file mode 100644 index 00000000..f796582b --- /dev/null +++ b/openai-agents/src/streaming/workflows.ts @@ -0,0 +1,17 @@ +import { Agent } from '@openai/agents-core'; +import { TemporalOpenAIRunner } from '@temporalio/openai-agents/workflow'; +import { WorkflowStream } from '@temporalio/workflow-streams/workflow'; + +export const streamingTopic = 'model-stream'; + +// @@@SNIPSTART typescript-openai-agents-streaming-workflow +export async function streamingChat(prompt: string): Promise { + new WorkflowStream(); + const agent = new Agent({ name: 'StreamingAgent', instructions: 'You are a helpful assistant.' }); + const result = await new TemporalOpenAIRunner().run(agent, prompt, { stream: true }); + // The external client is the event consumer; the Workflow only drives the run to completion. + for await (const _event of result); + await result.completed; + return result.finalOutput ?? ''; +} +// @@@SNIPEND