Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 73 additions & 20 deletions packages/typescript/ai-client/src/chat-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export class ChatClient {
private currentStreamId: string | null = null
private currentMessageId: string | null = null
private postStreamActions: Array<() => Promise<void>> = []
// Track pending client tool executions to await them before stream finalization
private pendingToolExecutions: Map<string, Promise<void>> = new Map()
// Flag to deduplicate continuation checks during action draining
private continuationPending = false

private callbacksRef: {
current: {
Expand Down Expand Up @@ -127,31 +131,41 @@ export class ChatClient {
)
}
},
onToolCall: async (args: {
onToolCall: (args: {
toolCallId: string
toolName: string
input: any
}) => {
// Handle client-side tool execution automatically
const clientTool = this.clientToolsRef.current.get(args.toolName)
if (clientTool?.execute) {
try {
const output = await clientTool.execute(args.input)
await this.addToolResult({
toolCallId: args.toolCallId,
tool: args.toolName,
output,
state: 'output-available',
})
} catch (error: any) {
await this.addToolResult({
toolCallId: args.toolCallId,
tool: args.toolName,
output: null,
state: 'output-error',
errorText: error.message,
})
}
const executeFunc = clientTool?.execute
if (executeFunc) {
// Create and track the execution promise
const executionPromise = (async () => {
try {
const output = await executeFunc(args.input)
await this.addToolResult({
toolCallId: args.toolCallId,
tool: args.toolName,
output,
state: 'output-available',
})
} catch (error: any) {
await this.addToolResult({
toolCallId: args.toolCallId,
tool: args.toolName,
output: null,
state: 'output-error',
errorText: error.message,
})
} finally {
// Remove from pending when complete
this.pendingToolExecutions.delete(args.toolCallId)
}
})()

// Track the pending execution
this.pendingToolExecutions.set(args.toolCallId, executionPromise)
}
},
onApprovalRequest: (args: {
Expand Down Expand Up @@ -221,6 +235,12 @@ export class ChatClient {
await new Promise((resolve) => setTimeout(resolve, 0))
}

// Wait for all pending tool executions to complete before finalizing
// This ensures client tools finish before we check for continuation
if (this.pendingToolExecutions.size > 0) {
await Promise.all(this.pendingToolExecutions.values())
}

// Finalize the stream
this.processor.finalizeStream()

Expand Down Expand Up @@ -288,9 +308,17 @@ export class ChatClient {
* Stream a response from the LLM
*/
private async streamResponse(): Promise<void> {
// Guard against concurrent streams - if already loading, skip
if (this.isLoading) {
return
}

this.setIsLoading(true)
this.setError(undefined)
this.abortController = new AbortController()
// Reset pending tool executions for the new stream
this.pendingToolExecutions.clear()
let streamCompletedSuccessfully = false

try {
// Get model messages for the LLM
Expand All @@ -313,6 +341,7 @@ export class ChatClient {
)

await this.processStream(stream)
streamCompletedSuccessfully = true
} catch (err) {
if (err instanceof Error) {
if (err.name === 'AbortError') {
Expand All @@ -327,6 +356,20 @@ export class ChatClient {

// Drain any actions that were queued while the stream was in progress
await this.drainPostStreamActions()

// Continue conversation if the stream ended with a tool result (server tool completed)
if (streamCompletedSuccessfully) {
const messages = this.processor.getMessages()
const lastPart = messages.at(-1)?.parts.at(-1)

if (lastPart?.type === 'tool-result' && this.shouldAutoSend()) {
try {
await this.checkForContinuation()
} catch (error) {
console.error('Failed to continue flow after tool result:', error)
}
}
}
}
}

Expand Down Expand Up @@ -470,8 +513,18 @@ export class ChatClient {
* Check if we should continue the flow and do so if needed
*/
private async checkForContinuation(): Promise<void> {
// Prevent duplicate continuation attempts
if (this.continuationPending || this.isLoading) {
return
}

if (this.shouldAutoSend()) {
await this.streamResponse()
this.continuationPending = true
try {
await this.streamResponse()
} finally {
this.continuationPending = false
}
}
}

Expand Down
11 changes: 10 additions & 1 deletion packages/typescript/ai/src/activities/chat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ class TextEngine<
const clientToolResults = new Map<string, any>()

for (const message of this.messages) {
// todo remove any and fix this
// Check for UIMessage format (parts array)
if (message.role === 'assistant' && (message as any).parts) {
const parts = (message as any).parts
for (const part of parts) {
Expand All @@ -733,6 +733,15 @@ class TextEngine<
}
}
}

// Check for ModelMessage format (toolCalls array with approval info)
if (message.role === 'assistant' && message.toolCalls) {
for (const toolCall of message.toolCalls) {
if (toolCall.approval) {
approvals.set(toolCall.approval.id, toolCall.approval.approved)
}
}
}
}

return { approvals, clientToolResults }
Expand Down
8 changes: 8 additions & 0 deletions packages/typescript/ai/src/activities/chat/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ export function uiMessageToModelMessages(
name: p.name,
arguments: p.arguments,
},
// Include approval info if tool was approved/denied (for server to know the decision)
...(p.state === 'approval-responded' &&
p.approval?.approved !== undefined && {
approval: {
id: p.approval.id,
approved: p.approval.approved,
},
}),
}))
: undefined

Expand Down
5 changes: 5 additions & 0 deletions packages/typescript/ai/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ export interface ToolCall {
name: string
arguments: string // JSON string
}
/** Approval info for tools requiring user approval (included in messages sent back to server) */
approval?: {
id: string
approved: boolean
}
}

// ============================================================================
Expand Down
9 changes: 9 additions & 0 deletions packages/typescript/smoke-tests/adapters/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,16 @@
"author": "",
"license": "MIT",
"type": "module",
"exports": {
".": {
"import": "./src/llm-simulator.ts",
"types": "./src/llm-simulator.ts"
}
},
"scripts": {
"start": "tsx src/cli.ts",
"test": "vitest run",
"test:watch": "vitest",
"typecheck": "tsc --noEmit"
},
"dependencies": {
Expand All @@ -24,6 +32,7 @@
"dotenv": "^17.2.3",
"tsx": "^4.20.6",
"typescript": "5.9.3",
"vitest": "^4.0.14",
"zod": "^4.2.0"
}
}
Loading
Loading