diff --git a/src/main/presenter/deepchatAgentPresenter/compactionService.ts b/src/main/presenter/deepchatAgentPresenter/compactionService.ts index aa107c373..708e3a764 100644 --- a/src/main/presenter/deepchatAgentPresenter/compactionService.ts +++ b/src/main/presenter/deepchatAgentPresenter/compactionService.ts @@ -21,6 +21,25 @@ const SAFETY_MARGIN = 1.2 const SUMMARIZATION_OVERHEAD_TOKENS = 4096 const SUMMARY_OUTPUT_TOKENS_CAP = 2048 +const createAbortError = (): Error => { + if (typeof DOMException !== 'undefined') { + return new DOMException('Aborted', 'AbortError') + } + + const error = new Error('Aborted') + error.name = 'AbortError' + return error +} + +const throwIfAbortRequested = (signal?: AbortSignal): void => { + if (signal?.aborted) { + throw createAbortError() + } +} + +const isAbortError = (error: unknown): boolean => + error instanceof Error && (error.name === 'AbortError' || error.name === 'CanceledError') + export type ModelSpec = { providerId: string modelId: string @@ -215,8 +234,11 @@ export class CompactionService { supportsVision: boolean preserveInterleavedReasoning: boolean newUserContent: string | SendMessageInput + signal?: AbortSignal }): Promise { + throwIfAbortRequested(params.signal) const settings = await this.getCompactionSettings(params.sessionId) + throwIfAbortRequested(params.signal) if (!settings.enabled) { return null } @@ -245,8 +267,11 @@ export class CompactionService { reserveTokens: number supportsVision: boolean preserveInterleavedReasoning: boolean + signal?: AbortSignal }): Promise { + throwIfAbortRequested(params.signal) const settings = await this.getCompactionSettings(params.sessionId) + throwIfAbortRequested(params.signal) if (!settings.enabled) { return null } @@ -279,14 +304,19 @@ export class CompactionService { }) } - async applyCompaction(intent: CompactionIntent): Promise { + async applyCompaction( + intent: CompactionIntent, + signal?: AbortSignal + ): Promise { try { + throwIfAbortRequested(signal) const nextSummary = await this.generateRollingSummary({ sessionId: intent.sessionId, previousSummary: intent.previousState.summaryText, summaryBlocks: intent.summaryBlocks, currentModel: intent.currentModel, - reserveTokens: intent.reserveTokens + reserveTokens: intent.reserveTokens, + signal }) const updatedState: SessionSummaryState = { @@ -313,6 +343,9 @@ export class CompactionService { summaryState: compareAndSet.currentState } } catch (error) { + if (signal?.aborted || isAbortError(error)) { + throw error + } console.warn(`[CompactionService] Failed to compact session ${intent.sessionId}:`, error) return { succeeded: false, @@ -496,9 +529,12 @@ export class CompactionService { summaryBlocks: string[] currentModel: ModelSpec reserveTokens: number + signal?: AbortSignal }): Promise { + throwIfAbortRequested(params.signal) const currentModel = params.currentModel const assistantModel = await this.getAssistantModelSpec(params.sessionId, currentModel) + throwIfAbortRequested(params.signal) const previousSummaryTokens = approximateTokenSize(params.previousSummary || '') const blockTokens = params.summaryBlocks.reduce( (total, block) => total + approximateTokenSize(block), @@ -515,7 +551,8 @@ export class CompactionService { return await this.summarizeBlocks(params.summaryBlocks, { previousSummary: params.previousSummary, model: preferredModel, - reserveTokens: params.reserveTokens + reserveTokens: params.reserveTokens, + signal: params.signal }) } @@ -525,8 +562,10 @@ export class CompactionService { previousSummary: string | null model: ModelSpec reserveTokens: number + signal?: AbortSignal } ): Promise { + throwIfAbortRequested(options.signal) const normalizedBlocks = blocks.map((block) => block.trim()).filter(Boolean) if (normalizedBlocks.length === 0) { const normalizedPrevious = options.previousSummary?.trim() @@ -546,7 +585,8 @@ export class CompactionService { options.model, options.reserveTokens, options.previousSummary, - normalizedBlocks.join('\n\n') + normalizedBlocks.join('\n\n'), + options.signal ) } @@ -569,7 +609,8 @@ export class CompactionService { options.model, options.reserveTokens, options.previousSummary, - joinedSplitBlocks + joinedSplitBlocks, + options.signal ) } @@ -596,16 +637,20 @@ export class CompactionService { previousSummary: string | null model: ModelSpec reserveTokens: number + signal?: AbortSignal } ): Promise { + throwIfAbortRequested(options.signal) const chunkSummaries: string[] = [] for (const chunk of chunkGroups) { + throwIfAbortRequested(options.signal) chunkSummaries.push( await this.generateSummaryText( options.model, options.reserveTokens, null, - chunk.join('\n\n') + chunk.join('\n\n'), + options.signal ) ) } @@ -701,9 +746,17 @@ export class CompactionService { model: ModelSpec, reserveTokens: number, previousSummary: string | null, - spanText: string + spanText: string, + signal?: AbortSignal ): Promise { + throwIfAbortRequested(signal) const prompt = this.buildSummaryPrompt(previousSummary, spanText) + if (signal) { + await this.llmProviderPresenter.executeWithRateLimit(model.providerId, { signal }) + } else { + await this.llmProviderPresenter.executeWithRateLimit(model.providerId) + } + throwIfAbortRequested(signal) const response = await this.llmProviderPresenter.generateText( model.providerId, prompt, diff --git a/src/main/presenter/deepchatAgentPresenter/index.ts b/src/main/presenter/deepchatAgentPresenter/index.ts index 06dead6fa..54c69daac 100644 --- a/src/main/presenter/deepchatAgentPresenter/index.ts +++ b/src/main/presenter/deepchatAgentPresenter/index.ts @@ -15,7 +15,12 @@ import type { } from '@shared/types/agent-interface' import type { MCPToolCall, MCPToolResponse } from '@shared/types/core/mcp' import type { ChatMessage } from '@shared/types/core/chat-message' -import type { IConfigPresenter, ILlmProviderPresenter, ModelConfig } from '@shared/presenter' +import type { + IConfigPresenter, + ILlmProviderPresenter, + ModelConfig, + RateLimitQueueSnapshot +} from '@shared/presenter' import type { MCPToolDefinition } from '@shared/types/core/mcp' import type { IToolPresenter } from '@shared/types/presenters/tool.presenter' import type { ReasoningPortrait } from '@shared/types/model-db' @@ -114,6 +119,8 @@ const isReasoningEffort = (value: unknown): value is 'minimal' | 'low' | 'medium const isVerbosity = (value: unknown): value is 'low' | 'medium' | 'high' => value === 'low' || value === 'medium' || value === 'high' +const RATE_LIMIT_STREAM_MESSAGE_PREFIX = '__rate_limit__:' + const createAbortError = (): Error => { if (typeof DOMException !== 'undefined') { return new DOMException('Aborted', 'AbortError') @@ -382,12 +389,16 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) this.setSessionStatus(sessionId, 'generating') + const preStreamAbortController = this.ensureSessionAbortController(sessionId) + const preStreamAbortSignal = preStreamAbortController.signal let consumedPendingQueueItem = false let userMessageId: string | null = null let assistantMessageId: string | null = null try { + this.throwIfAbortRequested(preStreamAbortSignal) const generationSettings = await this.getEffectiveSessionGenerationSettings(sessionId) + this.throwIfAbortRequested(preStreamAbortSignal) const interleavedReasoning = this.resolveInterleavedReasoningConfig( state.providerId, state.modelId, @@ -395,11 +406,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) const maxTokens = generationSettings.maxTokens const tools = await this.loadToolDefinitionsForSession(sessionId, projectDir) + this.throwIfAbortRequested(preStreamAbortSignal) const baseSystemPrompt = await this.buildSystemPromptWithSkills( sessionId, generationSettings.systemPrompt, tools ) + this.throwIfAbortRequested(preStreamAbortSignal) const historyRecords = this.messageStore .getMessages(sessionId) .filter((message) => message.status === 'sent') @@ -420,7 +433,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { reserveTokens: maxTokens, supportsVision, preserveInterleavedReasoning: interleavedReasoning.preserveReasoningContent, - newUserContent: normalizedInput + newUserContent: normalizedInput, + signal: preStreamAbortSignal }) let summaryState: SessionSummaryState @@ -443,7 +457,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { }) summaryState = await this.applyCompactionIntent(sessionId, compactionIntent, { compactionMessageId, - startedExternally: true + startedExternally: true, + signal: preStreamAbortSignal }) } else { summaryState = this.sessionStore.getSummaryState(sessionId) @@ -456,6 +471,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { if (!userMessageId) { throw new Error('Failed to create user message.') } + this.throwIfAbortRequested(preStreamAbortSignal) this.emitMessageRefresh(sessionId, userMessageId) this.dispatchHook('UserPromptSubmit', { @@ -485,6 +501,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { const assistantOrderSeq = this.messageStore.getNextOrderSeq(sessionId) assistantMessageId = this.messageStore.createAssistantMessage(sessionId, assistantOrderSeq) + this.throwIfAbortRequested(preStreamAbortSignal) if (context?.pendingQueueItemId) { this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId) @@ -524,6 +541,27 @@ export class DeepChatAgentPresenter implements IAgentImplementation { console.warn('[DeepChatAgent] failed to release claimed queue input:', releaseError) } } + if (this.isAbortError(err) || preStreamAbortSignal.aborted) { + if (userMessageId) { + this.emitMessageRefresh(sessionId, userMessageId) + } + if (assistantMessageId) { + const existingAssistant = this.messageStore.getMessage(assistantMessageId) + const blocks = buildTerminalErrorBlocks( + existingAssistant ? this.parseAssistantBlocks(existingAssistant.content) : [], + 'common.error.userCanceledGeneration' + ) + this.messageStore.setMessageError(assistantMessageId, blocks) + this.emitMessageRefresh(sessionId, assistantMessageId) + } + this.dispatchTerminalHooks(sessionId, state, { + status: 'aborted', + stopReason: 'user_stop', + errorMessage: 'common.error.userCanceledGeneration' + }) + this.setSessionStatus(sessionId, 'idle') + return + } const errorMessage = err instanceof Error ? err.message : String(err) if (assistantMessageId) { const existingAssistant = this.messageStore.getMessage(assistantMessageId) @@ -549,6 +587,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { error: { message: errorMessage } }) this.setSessionStatus(sessionId, 'error') + } finally { + this.clearSessionAbortController(sessionId, preStreamAbortController) } } @@ -1038,6 +1078,33 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) } + private ensureSessionAbortController(sessionId: string): AbortController { + const activeGeneration = this.activeGenerations.get(sessionId) + if (activeGeneration) { + return activeGeneration.abortController + } + + const existing = this.abortControllers.get(sessionId) + if (existing) { + existing.abort() + } + + const controller = new AbortController() + this.abortControllers.set(sessionId, controller) + return controller + } + + private clearSessionAbortController(sessionId: string, controller?: AbortController): void { + const current = this.abortControllers.get(sessionId) + if (!current) { + return + } + if (controller && current !== controller) { + return + } + this.abortControllers.delete(sessionId) + } + private buildDeferredToolAbortKey(sessionId: string, toolCallId: string): string { return `${sessionId}:${toolCallId}` } @@ -1342,6 +1409,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } const traceEnabled = this.configPresenter.getSetting('traceDebugEnabled') === true + const llmProviderPresenter = this.llmProviderPresenter const pendingInputCoordinator = this.pendingInputCoordinator const injectSteerInputsIntoRequest = this.injectSteerInputsIntoRequest.bind(this) const persistMessageTrace = this.persistMessageTrace.bind(this) @@ -1374,6 +1442,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { const abortController = new AbortController() const activeGeneration = this.registerActiveGeneration(sessionId, messageId, abortController) + const rateLimitMessageId = this.buildRateLimitStreamMessageId(activeGeneration.runId) + const emitRateLimitWaitingMessage = this.emitRateLimitWaitingMessage.bind(this) + const clearRateLimitWaitingMessage = this.clearRateLimitWaitingMessage.bind(this) try { this.dispatchHook('SessionStart', { @@ -1407,8 +1478,24 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) let didConsumeSteerBatch = false + let queuedForRateLimit = false try { + await llmProviderPresenter.executeWithRateLimit(state.providerId, { + signal: abortController.signal, + onQueued: (snapshot) => { + queuedForRateLimit = true + emitRateLimitWaitingMessage(sessionId, rateLimitMessageId, snapshot) + } + }) + if (queuedForRateLimit) { + clearRateLimitWaitingMessage(sessionId, rateLimitMessageId) + queuedForRateLimit = false + } + if (abortController.signal.aborted) { + throw createAbortError() + } + for await (const event of provider.coreStream( injectedMessages, requestModelId, @@ -1428,6 +1515,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) } } catch (error) { + if (queuedForRateLimit) { + clearRateLimitWaitingMessage(sessionId, rateLimitMessageId) + } if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { pendingInputCoordinator.releaseClaimedInputs(sessionId) } @@ -1669,6 +1759,47 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return this.activeGenerations.get(sessionId)?.runId === runId } + private buildRateLimitStreamMessageId(runId: string): string { + return `${RATE_LIMIT_STREAM_MESSAGE_PREFIX}${runId}` + } + + private emitRateLimitWaitingMessage( + sessionId: string, + messageId: string, + snapshot: RateLimitQueueSnapshot + ): void { + const block: AssistantMessageBlock = { + type: 'action', + action_type: 'rate_limit', + content: '', + status: 'pending', + timestamp: Date.now(), + extra: { + providerId: snapshot.providerId, + qpsLimit: snapshot.qpsLimit, + currentQps: snapshot.currentQps, + queueLength: snapshot.queueLength, + estimatedWaitTime: snapshot.estimatedWaitTime + } + } + + eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, { + conversationId: sessionId, + eventId: messageId, + messageId, + blocks: [block] + }) + } + + private clearRateLimitWaitingMessage(sessionId: string, messageId: string): void { + eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, { + conversationId: sessionId, + eventId: messageId, + messageId, + blocks: [] + }) + } + private applyProcessResultStatus( sessionId: string, result: ProcessResult | null | undefined, @@ -1710,6 +1841,8 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return false } this.resumingMessages.add(messageId) + let preStreamAbortController: AbortController | null = null + let preStreamAbortSignal: AbortSignal | undefined try { const state = this.runtimeState.get(sessionId) @@ -1718,7 +1851,11 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } this.setSessionStatus(sessionId, 'generating') + preStreamAbortController = this.ensureSessionAbortController(sessionId) + preStreamAbortSignal = preStreamAbortController.signal + this.throwIfAbortRequested(preStreamAbortSignal) const generationSettings = await this.getEffectiveSessionGenerationSettings(sessionId) + this.throwIfAbortRequested(preStreamAbortSignal) const interleavedReasoning = this.resolveInterleavedReasoningConfig( state.providerId, state.modelId, @@ -1727,11 +1864,13 @@ export class DeepChatAgentPresenter implements IAgentImplementation { const maxTokens = generationSettings.maxTokens const projectDir = this.resolveProjectDir(sessionId) const tools = await this.loadToolDefinitionsForSession(sessionId, projectDir) + this.throwIfAbortRequested(preStreamAbortSignal) const baseSystemPrompt = await this.buildSystemPromptWithSkills( sessionId, generationSettings.systemPrompt, tools ) + this.throwIfAbortRequested(preStreamAbortSignal) const summaryState = await this.resolveCompactionStateForResumeTurn({ sessionId, messageId, @@ -1741,8 +1880,10 @@ export class DeepChatAgentPresenter implements IAgentImplementation { contextLength: generationSettings.contextLength, reserveTokens: maxTokens, supportsVision: this.supportsVision(state.providerId, state.modelId), - preserveInterleavedReasoning: interleavedReasoning.preserveReasoningContent + preserveInterleavedReasoning: interleavedReasoning.preserveReasoningContent, + signal: preStreamAbortSignal }) + this.throwIfAbortRequested(preStreamAbortSignal) const systemPrompt = appendSummarySection(baseSystemPrompt, summaryState.summaryText) let resumeContext = buildResumeContext( sessionId, @@ -1794,6 +1935,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } } + this.throwIfAbortRequested(preStreamAbortSignal) const { runId, result } = await this.runStreamForMessage({ sessionId, messageId, @@ -1814,6 +1956,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return true } catch (error) { console.error('[DeepChatAgent] resumeAssistantMessage error:', error) + if (this.isAbortError(error) || preStreamAbortSignal?.aborted) { + const blocks = buildTerminalErrorBlocks( + initialBlocks, + 'common.error.userCanceledGeneration' + ) + this.messageStore.setMessageError(messageId, blocks) + this.emitMessageRefresh(sessionId, messageId) + this.dispatchTerminalHooks(sessionId, this.runtimeState.get(sessionId), { + status: 'aborted', + stopReason: 'user_stop', + errorMessage: 'common.error.userCanceledGeneration' + }) + this.setSessionStatus(sessionId, 'idle') + return false + } const errorMessage = error instanceof Error ? error.message : String(error) const blocks = buildTerminalErrorBlocks(initialBlocks, errorMessage) this.messageStore.setMessageError(messageId, blocks) @@ -1821,6 +1978,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { this.setSessionStatus(sessionId, 'error') throw error } finally { + this.clearSessionAbortController(sessionId, preStreamAbortController ?? undefined) this.resumingMessages.delete(messageId) } } @@ -3207,6 +3365,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { visionModel.modelId, visionModel.providerId ) + await this.llmProviderPresenter.executeWithRateLimit(visionModel.providerId, { + signal: abortSignal + }) const response = await this.llmProviderPresenter.generateCompletionStandalone( visionModel.providerId, messages, @@ -3411,9 +3572,10 @@ export class DeepChatAgentPresenter implements IAgentImplementation { reserveTokens: number supportsVision: boolean preserveInterleavedReasoning: boolean + signal?: AbortSignal }): Promise { const intent = await this.compactionService.prepareForResumeTurn(params) - return await this.applyCompactionIntent(params.sessionId, intent) + return await this.applyCompactionIntent(params.sessionId, intent, { signal: params.signal }) } private async applyCompactionIntent( @@ -3422,6 +3584,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { options?: { compactionMessageId?: string startedExternally?: boolean + signal?: AbortSignal } ): Promise { if (!intent) { @@ -3446,7 +3609,20 @@ export class DeepChatAgentPresenter implements IAgentImplementation { }) } - const result = await this.compactionService.applyCompaction(intent) + let result: Awaited> + try { + result = await this.compactionService.applyCompaction(intent, options?.signal) + } catch (error) { + if (this.isAbortError(error) || options?.signal?.aborted) { + this.messageStore.deleteMessage(compactionMessageId) + this.emitMessageRefresh(sessionId, compactionMessageId) + this.emitCompactionState( + sessionId, + this.summaryStateToCompactionState(intent.previousState) + ) + } + throw error + } if (result.succeeded) { this.messageStore.updateCompactionMessage( compactionMessageId, diff --git a/src/main/presenter/deepchatAgentPresenter/process.ts b/src/main/presenter/deepchatAgentPresenter/process.ts index 2e3088b9b..6b341fabd 100644 --- a/src/main/presenter/deepchatAgentPresenter/process.ts +++ b/src/main/presenter/deepchatAgentPresenter/process.ts @@ -18,6 +18,10 @@ const CONTEXT_WINDOW_ERROR_PATTERNS = [ const USER_CANCELED_GENERATION_ERROR = 'common.error.userCanceledGeneration' const NO_MODEL_RESPONSE_ERROR = 'common.error.noModelResponse' +function isAbortError(error: unknown): boolean { + return error instanceof Error && (error.name === 'AbortError' || error.name === 'CanceledError') +} + function isContextWindowErrorMessage(message: string): boolean { const normalized = message.toLowerCase() return CONTEXT_WINDOW_ERROR_PATTERNS.some((pattern) => normalized.includes(pattern)) @@ -268,6 +272,15 @@ export async function processStream(params: ProcessParams): Promise ({ - generateCompletionStandalone: (providerId, messages, modelId, temperature, maxTokens) => + executeWithRateLimit: (providerId, options) => + this.llmproviderPresenter.executeWithRateLimit(providerId, options), + generateCompletionStandalone: ( + providerId, + messages, + modelId, + temperature, + maxTokens, + options + ) => this.llmproviderPresenter.generateCompletionStandalone( providerId, messages, modelId, temperature, - maxTokens + maxTokens, + options ) }), createSettingsWindow: () => this.windowPresenter.createSettingsWindow(), diff --git a/src/main/presenter/llmProviderPresenter/index.ts b/src/main/presenter/llmProviderPresenter/index.ts index 8cd0ca95c..20c756658 100644 --- a/src/main/presenter/llmProviderPresenter/index.ts +++ b/src/main/presenter/llmProviderPresenter/index.ts @@ -12,6 +12,7 @@ import { IConfigPresenter, ISQLitePresenter, AcpConfigState, + RateLimitQueueSnapshot, AcpWorkdirInfo, AcpDebugRequest, AcpDebugRunResult @@ -203,6 +204,16 @@ export class LLMProviderPresenter implements ILlmProviderPresenter { return this.rateLimitManager.getAllProviderRateLimitStatus() } + async executeWithRateLimit( + providerId: string, + options?: { + signal?: AbortSignal + onQueued?: (snapshot: RateLimitQueueSnapshot) => void + } + ): Promise { + await this.rateLimitManager.executeWithRateLimit(providerId, options) + } + isGenerating(eventId: string): boolean { return this.activeStreams.has(eventId) } diff --git a/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts b/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts index 029f02900..e15d24da1 100644 --- a/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts +++ b/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts @@ -1,7 +1,23 @@ import { RATE_LIMIT_EVENTS } from '@/events' import { eventBus, SendTarget } from '@/eventbus' import { IConfigPresenter, LLM_PROVIDER } from '@shared/presenter' -import { ProviderRateLimitState, QueueItem, RateLimitConfig } from '../types' +import { + ExecuteWithRateLimitOptions, + ProviderRateLimitState, + QueueItem, + RateLimitConfig, + RateLimitQueueSnapshot +} from '../types' + +const createAbortError = (): Error => { + if (typeof DOMException !== 'undefined') { + return new DOMException('Aborted', 'AbortError') + } + + const error = new Error('Aborted') + error.name = 'AbortError' + return error +} export class RateLimitManager { private readonly providerRateLimitStates: Map = new Map() @@ -97,8 +113,14 @@ export class RateLimitManager { return status } - async executeWithRateLimit(providerId: string): Promise { + async executeWithRateLimit( + providerId: string, + options?: ExecuteWithRateLimitOptions + ): Promise { const state = this.getOrCreateRateLimitState(providerId) + if (options?.signal?.aborted) { + throw createAbortError() + } if (!state.config.enabled) { this.recordRequest(providerId) return Promise.resolve() @@ -108,14 +130,27 @@ export class RateLimitManager { return Promise.resolve() } return new Promise((resolve, reject) => { + let settled = false + let abortCleanup: (() => void) | null = null + const settle = (callback: () => void) => { + if (settled) { + return + } + settled = true + abortCleanup?.() + abortCleanup = null + callback() + } + const queueItem: QueueItem = { id: `${providerId}-${Date.now()}-${Math.random()}`, timestamp: Date.now(), - resolve, - reject + resolve: () => settle(resolve), + reject: (error) => settle(() => reject(error)) } state.queue.push(queueItem) + const snapshot = this.buildQueueSnapshot(providerId, state) console.log( `[RateLimitManager] Request queued for ${providerId}, queue length: ${state.queue.length}` ) @@ -124,6 +159,29 @@ export class RateLimitManager { queueLength: state.queue.length, requestId: queueItem.id }) + try { + options?.onQueued?.(snapshot) + } catch (error) { + console.warn(`[RateLimitManager] onQueued callback failed for ${providerId}:`, error) + } + + const signal = options?.signal + if (signal) { + const onAbort = () => { + const removed = this.removeQueueItem(providerId, queueItem.id) + if (removed) { + console.log(`[RateLimitManager] Request aborted while queued for ${providerId}`) + } + queueItem.reject(createAbortError()) + } + signal.addEventListener('abort', onAbort, { once: true }) + abortCleanup = () => signal.removeEventListener('abort', onAbort) + if (signal.aborted) { + onAbort() + return + } + } + this.processRateLimitQueue(providerId) }) } @@ -281,6 +339,39 @@ export class RateLimitManager { return state?.queue.length || 0 } + private removeQueueItem(providerId: string, queueItemId: string): boolean { + const state = this.providerRateLimitStates.get(providerId) + if (!state) { + return false + } + + const index = state.queue.findIndex((item) => item.id === queueItemId) + if (index === -1) { + return false + } + + state.queue.splice(index, 1) + return true + } + + private buildQueueSnapshot( + providerId: string, + state: ProviderRateLimitState + ): RateLimitQueueSnapshot { + const intervalMs = (1 / state.config.qpsLimit) * 1000 + const nextAllowedTime = state.lastRequestTime + intervalMs + const baseWaitTime = Math.max(0, nextAllowedTime - Date.now()) + const additionalQueuedIntervals = Math.max(0, state.queue.length - 1) * intervalMs + + return { + providerId, + qpsLimit: state.config.qpsLimit, + currentQps: this.getCurrentQps(providerId), + queueLength: state.queue.length, + estimatedWaitTime: Math.max(0, baseWaitTime + additionalQueuedIntervals) + } + } + private getLastRequestTime(providerId: string): number { const state = this.providerRateLimitStates.get(providerId) return state?.lastRequestTime || 0 diff --git a/src/main/presenter/llmProviderPresenter/types.ts b/src/main/presenter/llmProviderPresenter/types.ts index 946470fc5..5a51e864d 100644 --- a/src/main/presenter/llmProviderPresenter/types.ts +++ b/src/main/presenter/llmProviderPresenter/types.ts @@ -5,6 +5,19 @@ export interface RateLimitConfig { enabled: boolean } +export interface RateLimitQueueSnapshot { + providerId: string + qpsLimit: number + currentQps: number + queueLength: number + estimatedWaitTime: number +} + +export interface ExecuteWithRateLimitOptions { + signal?: AbortSignal + onQueued?: (snapshot: RateLimitQueueSnapshot) => void +} + export interface QueueItem { id: string timestamp: number diff --git a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts index 0fecb7f09..21a3b9e15 100644 --- a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts +++ b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts @@ -72,6 +72,25 @@ interface AgentToolManagerOptions { runtimePort: AgentToolRuntimePort } +const createAbortError = (): Error => { + if (typeof DOMException !== 'undefined') { + return new DOMException('Aborted', 'AbortError') + } + + const error = new Error('Aborted') + error.name = 'AbortError' + return error +} + +const throwIfAbortRequested = (signal?: AbortSignal): void => { + if (signal?.aborted) { + throw createAbortError() + } +} + +const isAbortError = (error: unknown): boolean => + error instanceof Error && (error.name === 'AbortError' || error.name === 'CanceledError') + export class AgentToolManager { private static readonly YO_BROWSER_TOOL_NAME_SET = new Set(YO_BROWSER_TOOL_NAMES) private agentWorkspacePath: string | null @@ -414,7 +433,7 @@ export class AgentToolManager { if (!this.fileSystemHandler) { throw new Error(`FileSystem handler not initialized for tool: ${toolName}`) } - return await this.callFileSystemTool(toolName, args, conversationId) + return await this.callFileSystemTool(toolName, args, conversationId, options) } // Route to Skill tools @@ -691,7 +710,10 @@ export class AgentToolManager { private async callFileSystemTool( toolName: string, args: Record, - conversationId?: string + conversationId?: string, + options?: { + signal?: AbortSignal + } ): Promise { // Handle process tool separately if (this.isProcessTool(toolName)) { @@ -798,7 +820,12 @@ export class AgentToolManager { if (this.isImageMimeType(mimeType)) { return { - content: await this.readImageWithVisionFallback(validPath, mimeType, conversationId) + content: await this.readImageWithVisionFallback( + validPath, + mimeType, + conversationId, + options?.signal + ) } } @@ -1194,14 +1221,17 @@ export class AgentToolManager { private async readImageWithVisionFallback( filePath: string, mimeType: string, - conversationId?: string + conversationId?: string, + signal?: AbortSignal ): Promise { + throwIfAbortRequested(signal) const fileBuffer = await fs.promises.readFile(filePath) + throwIfAbortRequested(signal) const metadata = this.buildImageMetadataBlock(filePath, mimeType, fileBuffer.length) let visionTarget: Awaited> try { - visionTarget = await this.resolveVisionTargetForConversation(conversationId) + visionTarget = await this.resolveVisionTargetForConversation(conversationId, signal) } catch (error) { logger.warn('[AgentToolManager] Failed to resolve vision target for image read:', { conversationId, @@ -1216,6 +1246,7 @@ export class AgentToolManager { } try { + throwIfAbortRequested(signal) const dataUrl = `data:${mimeType};base64,${fileBuffer.toString('base64')}` const messages: ChatMessage[] = [ { @@ -1237,13 +1268,29 @@ export class AgentToolManager { visionTarget.modelId, visionTarget.providerId ) - const response = await this.getLlmProviderPresenter().generateCompletionStandalone( - visionTarget.providerId, - messages, - visionTarget.modelId, - modelConfig?.temperature ?? 0.2, - modelConfig?.maxTokens ?? 1200 - ) + const llmProviderPresenter = this.getLlmProviderPresenter() + if (signal) { + await llmProviderPresenter.executeWithRateLimit(visionTarget.providerId, { signal }) + } else { + await llmProviderPresenter.executeWithRateLimit(visionTarget.providerId) + } + throwIfAbortRequested(signal) + const response = signal + ? await llmProviderPresenter.generateCompletionStandalone( + visionTarget.providerId, + messages, + visionTarget.modelId, + modelConfig?.temperature ?? 0.2, + modelConfig?.maxTokens ?? 1200, + { signal } + ) + : await llmProviderPresenter.generateCompletionStandalone( + visionTarget.providerId, + messages, + visionTarget.modelId, + modelConfig?.temperature ?? 0.2, + modelConfig?.maxTokens ?? 1200 + ) const normalized = (response || '').trim() if (!normalized) { @@ -1251,12 +1298,15 @@ export class AgentToolManager { } return normalized } catch (error) { + if (isAbortError(error)) { + throw error + } const message = error instanceof Error ? error.message : String(error) return `${metadata}\n\nVision analysis failed, downgraded to metadata.\nerror: ${message}` } } - private async resolveVisionTargetForConversation(conversationId?: string) { + private async resolveVisionTargetForConversation(conversationId?: string, signal?: AbortSignal) { if (!conversationId) { return null } @@ -1268,6 +1318,7 @@ export class AgentToolManager { modelId: sessionInfo?.modelId, agentId: sessionInfo?.agentId, configPresenter: this.configPresenter, + signal, logLabel: `read:${conversationId}` }) } catch (error) { diff --git a/src/main/presenter/toolPresenter/runtimePorts.ts b/src/main/presenter/toolPresenter/runtimePorts.ts index 78646ab38..12f52266e 100644 --- a/src/main/presenter/toolPresenter/runtimePorts.ts +++ b/src/main/presenter/toolPresenter/runtimePorts.ts @@ -61,7 +61,10 @@ export interface AgentToolRuntimePort { getSkillPresenter(): ISkillPresenter getYoBrowserToolHandler(): IYoBrowserPresenter['toolHandler'] getFilePresenter(): Pick - getLlmProviderPresenter(): Pick + getLlmProviderPresenter(): Pick< + ILlmProviderPresenter, + 'executeWithRateLimit' | 'generateCompletionStandalone' + > createSettingsWindow(): ReturnType sendToWindow( windowId: number, diff --git a/src/renderer/src/components/chat/MessageList.vue b/src/renderer/src/components/chat/MessageList.vue index b07b97317..9bf94f334 100644 --- a/src/renderer/src/components/chat/MessageList.vue +++ b/src/renderer/src/components/chat/MessageList.vue @@ -43,6 +43,14 @@ @copy-image="handleCopyImage" /> +
+ +
@@ -51,10 +59,12 @@ import { computed } from 'vue' import { useI18n } from 'vue-i18n' import MessageItemAssistant from '@/components/message/MessageItemAssistant.vue' +import MessageBlockAction from '@/components/message/MessageBlockAction.vue' import MessageItemUser from '@/components/message/MessageItemUser.vue' import { useMessageCapture } from '@/composables/message/useMessageCapture' import { type DisplayAssistantMessage, + type DisplayAssistantMessageBlock, isCompactionMessageItem, type DisplayUserMessage, type DisplayMessage, @@ -64,11 +74,17 @@ import { const props = withDefaults( defineProps<{ messages: MessageListItem[] + conversationId?: string + ephemeralRateLimitBlock?: DisplayAssistantMessageBlock | null + ephemeralRateLimitMessageId?: string | null isGenerating?: boolean traceMessageIds?: string[] isReadOnly?: boolean }>(), { + conversationId: '', + ephemeralRateLimitBlock: null, + ephemeralRateLimitMessageId: null, isGenerating: false, traceMessageIds: () => [], isReadOnly: false diff --git a/src/renderer/src/components/message/MessageBlockAction.vue b/src/renderer/src/components/message/MessageBlockAction.vue index 9bfc5a383..58601a5a1 100644 --- a/src/renderer/src/components/message/MessageBlockAction.vue +++ b/src/renderer/src/components/message/MessageBlockAction.vue @@ -1,7 +1,5 @@