diff --git a/sdks/typescript/packages/client/src/apply/__tests__/default.text-message.test.ts b/sdks/typescript/packages/client/src/apply/__tests__/default.text-message.test.ts index 581f418df..cd2b156e0 100644 --- a/sdks/typescript/packages/client/src/apply/__tests__/default.text-message.test.ts +++ b/sdks/typescript/packages/client/src/apply/__tests__/default.text-message.test.ts @@ -1,6 +1,6 @@ import { Subject } from "rxjs"; import { toArray } from "rxjs/operators"; -import { firstValueFrom } from "rxjs"; +import { firstValueFrom, of } from "rxjs"; import { BaseEvent, EventType, @@ -9,6 +9,7 @@ import { TextMessageContentEvent, TextMessageEndEvent, RunAgentInput, + RunFinishedEvent, } from "@ag-ui/core"; import { defaultApplyEvents } from "../default"; import { AbstractAgent } from "@/agent"; @@ -188,4 +189,59 @@ describe("defaultApplyEvents with text messages", () => { // Verify no additional updates after either TEXT_MESSAGE_END expect(stateUpdates.length).toBe(4); }); + + it("should emit a messages snapshot when the run finishes", async () => { + const initialState: RunAgentInput = { + messages: [], + state: {}, + threadId: "test-thread", + runId: "test-run", + tools: [], + context: [], + }; + + const subscriber = { + onMessagesSnapshotEvent: jest.fn(), + onEvent: jest.fn(), + }; + + const events: BaseEvent[] = [ + { + type: EventType.TEXT_MESSAGE_START, + messageId: "msg-1", + role: "assistant", + } as TextMessageStartEvent, + { + type: EventType.TEXT_MESSAGE_CONTENT, + messageId: "msg-1", + delta: "Hello world!", + } as TextMessageContentEvent, + { + type: EventType.TEXT_MESSAGE_END, + messageId: "msg-1", + } as TextMessageEndEvent, + { + type: EventType.RUN_FINISHED, + threadId: "test-thread", + runId: "test-run", + } as RunFinishedEvent, + ]; + + const result$ = defaultApplyEvents(initialState, of(...events), FAKE_AGENT, [subscriber as any]); + const mutations = await firstValueFrom(result$.pipe(toArray())); + + expect(subscriber.onMessagesSnapshotEvent).toHaveBeenCalledTimes(1); + const snapshotArgs = subscriber.onMessagesSnapshotEvent.mock.calls[0][0]; + expect(snapshotArgs.event.type).toBe(EventType.MESSAGES_SNAPSHOT); + expect(snapshotArgs.event.messages).toHaveLength(1); + expect(snapshotArgs.event.messages?.[0]?.content).toBe("Hello world!"); + + const eventTypes = subscriber.onEvent.mock.calls.map((call) => call[0].event.type); + expect(eventTypes[eventTypes.length - 2]).toBe(EventType.MESSAGES_SNAPSHOT); + expect(eventTypes[eventTypes.length - 1]).toBe(EventType.RUN_FINISHED); + + const finalMutation = mutations[mutations.length - 1]; + expect(finalMutation.messages).toHaveLength(1); + expect(finalMutation.messages?.[0]?.content).toBe("Hello world!"); + }); }); diff --git a/sdks/typescript/packages/client/src/apply/default.ts b/sdks/typescript/packages/client/src/apply/default.ts index 8f720c7a0..c7a566165 100644 --- a/sdks/typescript/packages/client/src/apply/default.ts +++ b/sdks/typescript/packages/client/src/apply/default.ts @@ -69,8 +69,59 @@ export const defaultApplyEvents = ( return EMPTY; }; + const emitMessagesSnapshot = async () => { + const snapshotEvent: MessagesSnapshotEvent = { + type: EventType.MESSAGES_SNAPSHOT, + messages: structuredClone_(messages), + }; + + const snapshotOnEventMutation = await runSubscribersWithMutation( + subscribers, + messages, + state, + (subscriber, messages, state) => + subscriber.onEvent?.({ + event: snapshotEvent, + agent, + input, + messages, + state, + }), + ); + applyMutation(snapshotOnEventMutation); + + if (snapshotOnEventMutation.stopPropagation === true) { + return; + } + + snapshotEvent.messages = structuredClone_(messages); + + const snapshotMutation = await runSubscribersWithMutation( + subscribers, + messages, + state, + (subscriber, messages, state) => + subscriber.onMessagesSnapshotEvent?.({ + event: snapshotEvent, + messages, + state, + agent, + input, + }), + ); + applyMutation(snapshotMutation); + + if (snapshotMutation.stopPropagation !== true && snapshotMutation.messages === undefined) { + applyMutation({ messages: structuredClone_(messages) }); + } + }; + return events$.pipe( concatMap(async (event) => { + if (event.type === EventType.RUN_FINISHED) { + await emitMessagesSnapshot(); + } + const mutation = await runSubscribersWithMutation( subscribers, messages,