From cd3c559b44f2c7f040fcbc8eb48ad90d51a24f85 Mon Sep 17 00:00:00 2001 From: zerob13 Date: Wed, 1 Apr 2026 20:20:43 +0800 Subject: [PATCH 1/4] fix(agent): restore chat rate limiting --- .../presenter/deepchatAgentPresenter/index.ts | 70 +++++- .../deepchatAgentPresenter/process.ts | 13 ++ .../presenter/llmProviderPresenter/index.ts | 11 + .../managers/rateLimitManager.ts | 99 ++++++++- .../presenter/llmProviderPresenter/types.ts | 13 ++ src/renderer/src/stores/ui/message.ts | 8 +- src/shared/types/agent-interface.d.ts | 2 +- src/shared/types/presenters/index.d.ts | 1 + .../types/presenters/legacy.presenters.d.ts | 13 ++ .../presenters/llmprovider.presenter.d.ts | 15 ++ .../deepchatAgentPresenter.test.ts | 200 +++++++++++++++++- .../deepchatAgentPresenter/process.test.ts | 26 +++ .../rateLimitManager.test.ts | 137 ++++++++++++ test/renderer/stores/messageStore.test.ts | 51 +++++ 14 files changed, 649 insertions(+), 10 deletions(-) create mode 100644 test/main/presenter/llmProviderPresenter/rateLimitManager.test.ts diff --git a/src/main/presenter/deepchatAgentPresenter/index.ts b/src/main/presenter/deepchatAgentPresenter/index.ts index 06dead6fa..21543e065 100644 --- a/src/main/presenter/deepchatAgentPresenter/index.ts +++ b/src/main/presenter/deepchatAgentPresenter/index.ts @@ -15,7 +15,12 @@ import type { } from '@shared/types/agent-interface' import type { MCPToolCall, MCPToolResponse } from '@shared/types/core/mcp' import type { ChatMessage } from '@shared/types/core/chat-message' -import type { IConfigPresenter, ILlmProviderPresenter, ModelConfig } from '@shared/presenter' +import type { + IConfigPresenter, + ILlmProviderPresenter, + ModelConfig, + RateLimitQueueSnapshot +} from '@shared/presenter' import type { MCPToolDefinition } from '@shared/types/core/mcp' import type { IToolPresenter } from '@shared/types/presenters/tool.presenter' import type { ReasoningPortrait } from '@shared/types/model-db' @@ -114,6 +119,8 @@ const isReasoningEffort = (value: unknown): value is 'minimal' | 'low' | 'medium const isVerbosity = (value: unknown): value is 'low' | 'medium' | 'high' => value === 'low' || value === 'medium' || value === 'high' +const RATE_LIMIT_STREAM_MESSAGE_PREFIX = '__rate_limit__:' + const createAbortError = (): Error => { if (typeof DOMException !== 'undefined') { return new DOMException('Aborted', 'AbortError') @@ -1342,6 +1349,7 @@ export class DeepChatAgentPresenter implements IAgentImplementation { } const traceEnabled = this.configPresenter.getSetting('traceDebugEnabled') === true + const llmProviderPresenter = this.llmProviderPresenter const pendingInputCoordinator = this.pendingInputCoordinator const injectSteerInputsIntoRequest = this.injectSteerInputsIntoRequest.bind(this) const persistMessageTrace = this.persistMessageTrace.bind(this) @@ -1374,6 +1382,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { const abortController = new AbortController() const activeGeneration = this.registerActiveGeneration(sessionId, messageId, abortController) + const rateLimitMessageId = this.buildRateLimitStreamMessageId(activeGeneration.runId) + const emitRateLimitWaitingMessage = this.emitRateLimitWaitingMessage.bind(this) + const clearRateLimitWaitingMessage = this.clearRateLimitWaitingMessage.bind(this) try { this.dispatchHook('SessionStart', { @@ -1407,8 +1418,21 @@ export class DeepChatAgentPresenter implements IAgentImplementation { ) let didConsumeSteerBatch = false + let queuedForRateLimit = false try { + await llmProviderPresenter.executeWithRateLimit(state.providerId, { + signal: abortController.signal, + onQueued: (snapshot) => { + queuedForRateLimit = true + emitRateLimitWaitingMessage(sessionId, rateLimitMessageId, snapshot) + } + }) + if (queuedForRateLimit) { + clearRateLimitWaitingMessage(sessionId, rateLimitMessageId) + queuedForRateLimit = false + } + for await (const event of provider.coreStream( injectedMessages, requestModelId, @@ -1428,6 +1452,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) } } catch (error) { + if (queuedForRateLimit) { + clearRateLimitWaitingMessage(sessionId, rateLimitMessageId) + } if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { pendingInputCoordinator.releaseClaimedInputs(sessionId) } @@ -1669,6 +1696,47 @@ export class DeepChatAgentPresenter implements IAgentImplementation { return this.activeGenerations.get(sessionId)?.runId === runId } + private buildRateLimitStreamMessageId(runId: string): string { + return `${RATE_LIMIT_STREAM_MESSAGE_PREFIX}${runId}` + } + + private emitRateLimitWaitingMessage( + sessionId: string, + messageId: string, + snapshot: RateLimitQueueSnapshot + ): void { + const block: AssistantMessageBlock = { + type: 'action', + action_type: 'rate_limit', + content: '', + status: 'pending', + timestamp: Date.now(), + extra: { + providerId: snapshot.providerId, + qpsLimit: snapshot.qpsLimit, + currentQps: snapshot.currentQps, + queueLength: snapshot.queueLength, + estimatedWaitTime: snapshot.estimatedWaitTime + } + } + + eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, { + conversationId: sessionId, + eventId: messageId, + messageId, + blocks: [block] + }) + } + + private clearRateLimitWaitingMessage(sessionId: string, messageId: string): void { + eventBus.sendToRenderer(STREAM_EVENTS.RESPONSE, SendTarget.ALL_WINDOWS, { + conversationId: sessionId, + eventId: messageId, + messageId, + blocks: [] + }) + } + private applyProcessResultStatus( sessionId: string, result: ProcessResult | null | undefined, diff --git a/src/main/presenter/deepchatAgentPresenter/process.ts b/src/main/presenter/deepchatAgentPresenter/process.ts index 2e3088b9b..6b341fabd 100644 --- a/src/main/presenter/deepchatAgentPresenter/process.ts +++ b/src/main/presenter/deepchatAgentPresenter/process.ts @@ -18,6 +18,10 @@ const CONTEXT_WINDOW_ERROR_PATTERNS = [ const USER_CANCELED_GENERATION_ERROR = 'common.error.userCanceledGeneration' const NO_MODEL_RESPONSE_ERROR = 'common.error.noModelResponse' +function isAbortError(error: unknown): boolean { + return error instanceof Error && (error.name === 'AbortError' || error.name === 'CanceledError') +} + function isContextWindowErrorMessage(message: string): boolean { const normalized = message.toLowerCase() return CONTEXT_WINDOW_ERROR_PATTERNS.some((pattern) => normalized.includes(pattern)) @@ -268,6 +272,15 @@ export async function processStream(params: ProcessParams): Promise void + } + ): Promise { + await this.rateLimitManager.executeWithRateLimit(providerId, options) + } + isGenerating(eventId: string): boolean { return this.activeStreams.has(eventId) } diff --git a/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts b/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts index 029f02900..e15d24da1 100644 --- a/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts +++ b/src/main/presenter/llmProviderPresenter/managers/rateLimitManager.ts @@ -1,7 +1,23 @@ import { RATE_LIMIT_EVENTS } from '@/events' import { eventBus, SendTarget } from '@/eventbus' import { IConfigPresenter, LLM_PROVIDER } from '@shared/presenter' -import { ProviderRateLimitState, QueueItem, RateLimitConfig } from '../types' +import { + ExecuteWithRateLimitOptions, + ProviderRateLimitState, + QueueItem, + RateLimitConfig, + RateLimitQueueSnapshot +} from '../types' + +const createAbortError = (): Error => { + if (typeof DOMException !== 'undefined') { + return new DOMException('Aborted', 'AbortError') + } + + const error = new Error('Aborted') + error.name = 'AbortError' + return error +} export class RateLimitManager { private readonly providerRateLimitStates: Map = new Map() @@ -97,8 +113,14 @@ export class RateLimitManager { return status } - async executeWithRateLimit(providerId: string): Promise { + async executeWithRateLimit( + providerId: string, + options?: ExecuteWithRateLimitOptions + ): Promise { const state = this.getOrCreateRateLimitState(providerId) + if (options?.signal?.aborted) { + throw createAbortError() + } if (!state.config.enabled) { this.recordRequest(providerId) return Promise.resolve() @@ -108,14 +130,27 @@ export class RateLimitManager { return Promise.resolve() } return new Promise((resolve, reject) => { + let settled = false + let abortCleanup: (() => void) | null = null + const settle = (callback: () => void) => { + if (settled) { + return + } + settled = true + abortCleanup?.() + abortCleanup = null + callback() + } + const queueItem: QueueItem = { id: `${providerId}-${Date.now()}-${Math.random()}`, timestamp: Date.now(), - resolve, - reject + resolve: () => settle(resolve), + reject: (error) => settle(() => reject(error)) } state.queue.push(queueItem) + const snapshot = this.buildQueueSnapshot(providerId, state) console.log( `[RateLimitManager] Request queued for ${providerId}, queue length: ${state.queue.length}` ) @@ -124,6 +159,29 @@ export class RateLimitManager { queueLength: state.queue.length, requestId: queueItem.id }) + try { + options?.onQueued?.(snapshot) + } catch (error) { + console.warn(`[RateLimitManager] onQueued callback failed for ${providerId}:`, error) + } + + const signal = options?.signal + if (signal) { + const onAbort = () => { + const removed = this.removeQueueItem(providerId, queueItem.id) + if (removed) { + console.log(`[RateLimitManager] Request aborted while queued for ${providerId}`) + } + queueItem.reject(createAbortError()) + } + signal.addEventListener('abort', onAbort, { once: true }) + abortCleanup = () => signal.removeEventListener('abort', onAbort) + if (signal.aborted) { + onAbort() + return + } + } + this.processRateLimitQueue(providerId) }) } @@ -281,6 +339,39 @@ export class RateLimitManager { return state?.queue.length || 0 } + private removeQueueItem(providerId: string, queueItemId: string): boolean { + const state = this.providerRateLimitStates.get(providerId) + if (!state) { + return false + } + + const index = state.queue.findIndex((item) => item.id === queueItemId) + if (index === -1) { + return false + } + + state.queue.splice(index, 1) + return true + } + + private buildQueueSnapshot( + providerId: string, + state: ProviderRateLimitState + ): RateLimitQueueSnapshot { + const intervalMs = (1 / state.config.qpsLimit) * 1000 + const nextAllowedTime = state.lastRequestTime + intervalMs + const baseWaitTime = Math.max(0, nextAllowedTime - Date.now()) + const additionalQueuedIntervals = Math.max(0, state.queue.length - 1) * intervalMs + + return { + providerId, + qpsLimit: state.config.qpsLimit, + currentQps: this.getCurrentQps(providerId), + queueLength: state.queue.length, + estimatedWaitTime: Math.max(0, baseWaitTime + additionalQueuedIntervals) + } + } + private getLastRequestTime(providerId: string): number { const state = this.providerRateLimitStates.get(providerId) return state?.lastRequestTime || 0 diff --git a/src/main/presenter/llmProviderPresenter/types.ts b/src/main/presenter/llmProviderPresenter/types.ts index 946470fc5..5a51e864d 100644 --- a/src/main/presenter/llmProviderPresenter/types.ts +++ b/src/main/presenter/llmProviderPresenter/types.ts @@ -5,6 +5,19 @@ export interface RateLimitConfig { enabled: boolean } +export interface RateLimitQueueSnapshot { + providerId: string + qpsLimit: number + currentQps: number + queueLength: number + estimatedWaitTime: number +} + +export interface ExecuteWithRateLimitOptions { + signal?: AbortSignal + onQueued?: (snapshot: RateLimitQueueSnapshot) => void +} + export interface QueueItem { id: string timestamp: number diff --git a/src/renderer/src/stores/ui/message.ts b/src/renderer/src/stores/ui/message.ts index f84642798..32b99f29c 100644 --- a/src/renderer/src/stores/ui/message.ts +++ b/src/renderer/src/stores/ui/message.ts @@ -9,6 +9,8 @@ import type { } from '@shared/types/agent-interface' import { useSessionStore } from './session' +const EPHEMERAL_STREAM_MESSAGE_PREFIXES = ['__rate_limit__:'] + // --- Store --- export const useMessageStore = defineStore('message', () => { @@ -124,6 +126,10 @@ export const useMessageStore = defineStore('message', () => { currentStreamMessageId.value = null } + function isEphemeralStreamMessageId(messageId: string): boolean { + return EPHEMERAL_STREAM_MESSAGE_PREFIXES.some((prefix) => messageId.startsWith(prefix)) + } + function applyStreamingBlocksToMessage( messageId: string, conversationId: string, @@ -184,7 +190,7 @@ export const useMessageStore = defineStore('message', () => { currentStreamSessionId.value = msg.conversationId currentStreamMessageId.value = streamMessageId ?? null streamingBlocks.value = msg.blocks - if (streamMessageId) { + if (streamMessageId && !isEphemeralStreamMessageId(streamMessageId)) { applyStreamingBlocksToMessage(streamMessageId, msg.conversationId, msg.blocks) } } diff --git a/src/shared/types/agent-interface.d.ts b/src/shared/types/agent-interface.d.ts index e51e521f7..6092aaa64 100644 --- a/src/shared/types/agent-interface.d.ts +++ b/src/shared/types/agent-interface.d.ts @@ -272,7 +272,7 @@ export interface AssistantMessageBlock { } tool_call?: ToolCallBlockData extra?: AssistantMessageExtra - action_type?: 'tool_call_permission' | 'question_request' + action_type?: 'tool_call_permission' | 'question_request' | 'rate_limit' } export interface MessageMetadata { diff --git a/src/shared/types/presenters/index.d.ts b/src/shared/types/presenters/index.d.ts index 052130b27..2792d29ba 100644 --- a/src/shared/types/presenters/index.d.ts +++ b/src/shared/types/presenters/index.d.ts @@ -12,6 +12,7 @@ export type { LLM_PROVIDER, LLM_PROVIDER_BASE, MODEL_META, + RateLimitQueueSnapshot, RENDERER_MODEL_META, LLM_EMBEDDING_ATTRS, KeyStatus, diff --git a/src/shared/types/presenters/legacy.presenters.d.ts b/src/shared/types/presenters/legacy.presenters.d.ts index 7031cfad3..0b6d01c91 100644 --- a/src/shared/types/presenters/legacy.presenters.d.ts +++ b/src/shared/types/presenters/legacy.presenters.d.ts @@ -1157,6 +1157,19 @@ export interface ILlmProviderPresenter { lastRequestTime: number } > + executeWithRateLimit( + providerId: string, + options?: { + signal?: AbortSignal + onQueued?: (snapshot: { + providerId: string + qpsLimit: number + currentQps: number + queueLength: number + estimatedWaitTime: number + }) => void + } + ): Promise syncModelScopeMcpServers( providerId: string, syncOptions?: ModelScopeMcpSyncOptions diff --git a/src/shared/types/presenters/llmprovider.presenter.d.ts b/src/shared/types/presenters/llmprovider.presenter.d.ts index 72412f1bf..3a629dd71 100644 --- a/src/shared/types/presenters/llmprovider.presenter.d.ts +++ b/src/shared/types/presenters/llmprovider.presenter.d.ts @@ -169,6 +169,14 @@ export interface ModelScopeMcpSyncResult { errors: string[] } +export type RateLimitQueueSnapshot = { + providerId: string + qpsLimit: number + currentQps: number + queueLength: number + estimatedWaitTime: number +} + export type AcpConfigOptionValue = { value: string label: string @@ -259,6 +267,13 @@ export interface ILlmProviderPresenter { lastRequestTime: number } > + executeWithRateLimit( + providerId: string, + options?: { + signal?: AbortSignal + onQueued?: (snapshot: RateLimitQueueSnapshot) => void + } + ): Promise syncModelScopeMcpServers( providerId: string, syncOptions?: ModelScopeMcpSyncOptions diff --git a/test/main/presenter/deepchatAgentPresenter/deepchatAgentPresenter.test.ts b/test/main/presenter/deepchatAgentPresenter/deepchatAgentPresenter.test.ts index 51870f992..252cff836 100644 --- a/test/main/presenter/deepchatAgentPresenter/deepchatAgentPresenter.test.ts +++ b/test/main/presenter/deepchatAgentPresenter/deepchatAgentPresenter.test.ts @@ -171,10 +171,13 @@ function createMockCoreStream() { } function createMockLlmProviderPresenter() { + const providerInstance = { + coreStream: vi.fn().mockImplementation(() => createMockCoreStream()()) + } + return { - getProviderInstance: vi.fn().mockReturnValue({ - coreStream: vi.fn().mockReturnValue(createMockCoreStream()()) - }), + getProviderInstance: vi.fn().mockReturnValue(providerInstance), + executeWithRateLimit: vi.fn().mockResolvedValue(undefined), generateCompletionStandalone: vi.fn().mockResolvedValue('English screenshot summary'), generateText: vi.fn().mockResolvedValue({ content: ['## Current Goal', '- Continue the session safely'].join('\n') @@ -856,6 +859,197 @@ describe('DeepChatAgentPresenter', () => { expect(callArgs.modelConfig.verbosity).toBe('high') }) + it('passes every provider turn through executeWithRateLimit', async () => { + await agent.initSession('s1', { providerId: 'openai', modelId: 'gpt-4' }) + await agent.processMessage('s1', 'Hello') + + const callArgs = (processStream as ReturnType).mock.calls[0][0] + for await (const _event of callArgs.coreStream( + callArgs.messages, + callArgs.modelId, + callArgs.modelConfig, + callArgs.temperature, + callArgs.maxTokens, + callArgs.tools + )) { + } + for await (const _event of callArgs.coreStream( + callArgs.messages, + callArgs.modelId, + callArgs.modelConfig, + callArgs.temperature, + callArgs.maxTokens, + callArgs.tools + )) { + } + + expect(llmProvider.executeWithRateLimit).toHaveBeenCalledTimes(2) + expect(llmProvider.executeWithRateLimit).toHaveBeenNthCalledWith( + 1, + 'openai', + expect.objectContaining({ + signal: expect.any(AbortSignal), + onQueued: expect.any(Function) + }) + ) + }) + + it('emits and clears an ephemeral rate-limit message while waiting for the provider gate', async () => { + llmProvider.executeWithRateLimit.mockImplementation( + async (_providerId: string, options?: { onQueued?: (snapshot: any) => void }) => { + options?.onQueued?.({ + providerId: 'openai', + qpsLimit: 1, + currentQps: 1, + queueLength: 2, + estimatedWaitTime: 4000 + }) + } + ) + + await agent.initSession('s1', { providerId: 'openai', modelId: 'gpt-4' }) + await agent.processMessage('s1', 'Hello') + + const callArgs = (processStream as ReturnType).mock.calls[0][0] + for await (const _event of callArgs.coreStream( + callArgs.messages, + callArgs.modelId, + callArgs.modelConfig, + callArgs.temperature, + callArgs.maxTokens, + callArgs.tools + )) { + } + + const streamResponseCalls = (eventBus.sendToRenderer as ReturnType).mock.calls + .filter(([eventName]) => eventName === 'stream:response') + .map(([, , payload]) => payload) + .filter((payload) => typeof payload?.messageId === 'string') + + const rateLimitShow = streamResponseCalls.find( + (payload) => + payload.messageId.startsWith('__rate_limit__:') && + Array.isArray(payload.blocks) && + payload.blocks.length === 1 + ) + const rateLimitClear = streamResponseCalls.find( + (payload) => + payload.messageId.startsWith('__rate_limit__:') && + Array.isArray(payload.blocks) && + payload.blocks.length === 0 + ) + + expect(rateLimitShow).toMatchObject({ + conversationId: 's1', + blocks: [ + expect.objectContaining({ + type: 'action', + action_type: 'rate_limit', + status: 'pending', + extra: expect.objectContaining({ + providerId: 'openai', + queueLength: 2, + estimatedWaitTime: 4000 + }) + }) + ] + }) + expect(rateLimitClear).toMatchObject({ + conversationId: 's1', + blocks: [] + }) + }) + + it('does not call provider.coreStream when a queued request is canceled', async () => { + const abortError = new Error('Aborted') + abortError.name = 'AbortError' + const queued = Promise.withResolvers() + llmProvider.executeWithRateLimit.mockImplementation( + ( + _providerId: string, + options?: { signal?: AbortSignal; onQueued?: (snapshot: any) => void } + ) => + new Promise((resolve, reject) => { + options?.onQueued?.({ + providerId: 'openai', + qpsLimit: 1, + currentQps: 1, + queueLength: 1, + estimatedWaitTime: 1000 + }) + queued.resolve() + + if (options?.signal?.aborted) { + reject(abortError) + return + } + + options?.signal?.addEventListener( + 'abort', + () => { + reject(abortError) + }, + { once: true } + ) + + void resolve + }) + ) + ;(processStream as ReturnType).mockImplementation( + async (params: { + coreStream: ( + messages: any[], + modelId: string, + modelConfig: any, + temperature: number, + maxTokens: number, + tools: any[] + ) => AsyncGenerator + messages: any[] + modelId: string + modelConfig: any + temperature: number + maxTokens: number + tools: any[] + }) => { + try { + for await (const _event of params.coreStream( + params.messages, + params.modelId, + params.modelConfig, + params.temperature, + params.maxTokens, + params.tools + )) { + } + + return { status: 'completed' as const } + } catch (error) { + return { + status: + error instanceof Error && error.name === 'AbortError' + ? ('aborted' as const) + : ('error' as const), + stopReason: + error instanceof Error && error.name === 'AbortError' ? 'user_stop' : 'error', + errorMessage: error instanceof Error ? error.message : String(error) + } + } + } + ) + + await agent.initSession('s1', { providerId: 'openai', modelId: 'gpt-4' }) + + const processing = agent.processMessage('s1', 'Hello') + await queued.promise + await agent.cancelGeneration('s1') + await processing + + const providerCoreStream = llmProvider.getProviderInstance.mock.results[0]?.value.coreStream + expect(providerCoreStream).not.toHaveBeenCalled() + expect((await agent.getSessionState('s1'))?.status).toBe('idle') + }) + it('reuses cached system prompt within the same day', async () => { vi.useFakeTimers() vi.setSystemTime(new Date('2026-03-05T08:00:00.000Z')) diff --git a/test/main/presenter/deepchatAgentPresenter/process.test.ts b/test/main/presenter/deepchatAgentPresenter/process.test.ts index 971b79e2c..13b2576b4 100644 --- a/test/main/presenter/deepchatAgentPresenter/process.test.ts +++ b/test/main/presenter/deepchatAgentPresenter/process.test.ts @@ -168,6 +168,32 @@ describe('processStream', () => { ) }) + it('treats AbortError thrown before the first event as aborted without writing an error block', async () => { + const abortError = new Error('Aborted') + abortError.name = 'AbortError' + const coreStream = vi.fn(async function* () { + throw abortError + }) as unknown as ProcessParams['coreStream'] + + const params = createParams({ coreStream }) + const promise = processStream(params) + await vi.runAllTimersAsync() + const result = await promise + + expect(result).toMatchObject({ + status: 'aborted', + stopReason: 'user_stop', + errorMessage: 'common.error.userCanceledGeneration' + }) + expect(messageStore.setMessageError).not.toHaveBeenCalled() + expect(messageStore.finalizeAssistantMessage).not.toHaveBeenCalled() + expect(eventBus.sendToRenderer).not.toHaveBeenCalledWith( + 'stream:error', + 'all', + expect.anything() + ) + }) + it('single tool call → loop once, finalize', async () => { let callCount = 0 const coreStream = vi.fn(function () { diff --git a/test/main/presenter/llmProviderPresenter/rateLimitManager.test.ts b/test/main/presenter/llmProviderPresenter/rateLimitManager.test.ts new file mode 100644 index 000000000..cdbd6164e --- /dev/null +++ b/test/main/presenter/llmProviderPresenter/rateLimitManager.test.ts @@ -0,0 +1,137 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +vi.mock('@/eventbus', () => ({ + eventBus: { + send: vi.fn() + }, + SendTarget: { + ALL_WINDOWS: 'all' + } +})) + +vi.mock('@/events', () => ({ + RATE_LIMIT_EVENTS: { + CONFIG_UPDATED: 'rate-limit:config-updated', + REQUEST_QUEUED: 'rate-limit:request-queued', + REQUEST_EXECUTED: 'rate-limit:request-executed', + LIMIT_EXCEEDED: 'rate-limit:limit-exceeded' + } +})) + +import { eventBus } from '@/eventbus' +import { RateLimitManager } from '@/presenter/llmProviderPresenter/managers/rateLimitManager' + +function createConfigPresenter(rateLimit?: { enabled: boolean; qpsLimit: number }) { + const provider = { + id: 'openai', + name: 'OpenAI', + rateLimit: rateLimit ?? { enabled: false, qpsLimit: 1 } + } + + return { + provider, + presenter: { + getProviders: vi.fn(() => [provider]), + getProviderById: vi.fn(() => provider), + setProviderById: vi.fn((providerId: string, nextProvider: typeof provider) => { + if (providerId === provider.id) { + Object.assign(provider, nextProvider) + } + }) + } + } +} + +describe('RateLimitManager', () => { + beforeEach(() => { + vi.useFakeTimers() + vi.setSystemTime(new Date('2026-04-01T00:00:00.000Z')) + vi.clearAllMocks() + }) + + afterEach(() => { + vi.useRealTimers() + }) + + it('executes immediately and records the request when the provider is not rate limited', async () => { + const { presenter } = createConfigPresenter({ enabled: false, qpsLimit: 1 }) + const manager = new RateLimitManager(presenter as any) + manager.initializeProviderRateLimitConfigs() + + await manager.executeWithRateLimit('openai') + + expect(eventBus.send).toHaveBeenCalledWith( + 'rate-limit:request-executed', + 'all', + expect.objectContaining({ + providerId: 'openai', + timestamp: Date.now() + }) + ) + }) + + it('queues a request, reports queue info, and executes it after the interval', async () => { + const { presenter } = createConfigPresenter({ enabled: true, qpsLimit: 1 }) + const manager = new RateLimitManager(presenter as any) + manager.initializeProviderRateLimitConfigs() + + await manager.executeWithRateLimit('openai') + + const onQueued = vi.fn() + const queuedPromise = manager.executeWithRateLimit('openai', { onQueued }) + await Promise.resolve() + + expect(onQueued).toHaveBeenCalledWith( + expect.objectContaining({ + providerId: 'openai', + qpsLimit: 1, + currentQps: 1, + queueLength: 1, + estimatedWaitTime: expect.any(Number) + }) + ) + expect(manager.getQueueLength('openai')).toBe(1) + + await vi.advanceTimersByTimeAsync(1000) + await queuedPromise + + expect(manager.getQueueLength('openai')).toBe(0) + expect( + (eventBus.send as ReturnType).mock.calls.filter( + ([eventName]) => eventName === 'rate-limit:request-queued' + ) + ).toHaveLength(1) + expect( + (eventBus.send as ReturnType).mock.calls.filter( + ([eventName]) => eventName === 'rate-limit:request-executed' + ) + ).toHaveLength(2) + }) + + it('removes an aborted queued request and never reaches the provider gate', async () => { + const { presenter } = createConfigPresenter({ enabled: true, qpsLimit: 1 }) + const manager = new RateLimitManager(presenter as any) + manager.initializeProviderRateLimitConfigs() + + await manager.executeWithRateLimit('openai') + + const abortController = new AbortController() + const queuedPromise = manager.executeWithRateLimit('openai', { + signal: abortController.signal + }) + await Promise.resolve() + + abortController.abort() + + await expect(queuedPromise).rejects.toMatchObject({ name: 'AbortError' }) + expect(manager.getQueueLength('openai')).toBe(0) + + await vi.advanceTimersByTimeAsync(1000) + + expect( + (eventBus.send as ReturnType).mock.calls.filter( + ([eventName]) => eventName === 'rate-limit:request-executed' + ) + ).toHaveLength(1) + }) +}) diff --git a/test/renderer/stores/messageStore.test.ts b/test/renderer/stores/messageStore.test.ts index fb2b1b248..c6a26ae07 100644 --- a/test/renderer/stores/messageStore.test.ts +++ b/test/renderer/stores/messageStore.test.ts @@ -118,4 +118,55 @@ describe('messageStore', () => { expect(store.messages.value).toHaveLength(1) expect(store.messages.value[0]?.id).toBe('m2') }) + + it('keeps rate-limit stream messages ephemeral and skips message hydration', async () => { + const { store, newAgentPresenter } = await setupStore() + const responseHandler = ( + (window as any).electron.ipcRenderer.on as ReturnType + ).mock.calls.find(([eventName]) => eventName === 'stream:response')?.[1] + + expect(typeof responseHandler).toBe('function') + + responseHandler( + {}, + { + conversationId: 's1', + messageId: '__rate_limit__:s1:1', + blocks: [ + { + type: 'action', + action_type: 'rate_limit', + status: 'pending', + timestamp: 1, + extra: { + providerId: 'openai', + qpsLimit: 1, + currentQps: 1, + queueLength: 2, + estimatedWaitTime: 4000 + } + } + ] + } + ) + + expect(store.isStreaming.value).toBe(true) + expect(store.currentStreamMessageId.value).toBe('__rate_limit__:s1:1') + expect(store.streamingBlocks.value).toHaveLength(1) + expect(store.messages.value).toHaveLength(0) + expect(newAgentPresenter.getMessage).not.toHaveBeenCalled() + + responseHandler( + {}, + { + conversationId: 's1', + messageId: '__rate_limit__:s1:1', + blocks: [] + } + ) + + expect(store.streamingBlocks.value).toEqual([]) + expect(store.messages.value).toHaveLength(0) + expect(newAgentPresenter.getMessage).not.toHaveBeenCalled() + }) }) From c8f5b8b5cb248396ace2604845d57f6574801ce0 Mon Sep 17 00:00:00 2001 From: zerob13 Date: Wed, 1 Apr 2026 21:44:19 +0800 Subject: [PATCH 2/4] fix(agent): compact rate limit feedback --- .../compactionService.ts | 1 + .../presenter/deepchatAgentPresenter/index.ts | 3 + src/main/presenter/index.ts | 2 + .../agentTools/agentToolManager.ts | 1 + .../presenter/toolPresenter/runtimePorts.ts | 5 +- .../src/components/chat/MessageList.vue | 16 +++ .../components/message/MessageBlockAction.vue | 114 +++++------------- src/renderer/src/i18n/da-DK/chat.json | 1 + src/renderer/src/i18n/en-US/chat.json | 1 + src/renderer/src/i18n/fa-IR/chat.json | 1 + src/renderer/src/i18n/fr-FR/chat.json | 1 + src/renderer/src/i18n/he-IL/chat.json | 1 + src/renderer/src/i18n/ja-JP/chat.json | 1 + src/renderer/src/i18n/ko-KR/chat.json | 1 + src/renderer/src/i18n/pt-BR/chat.json | 1 + src/renderer/src/i18n/ru-RU/chat.json | 1 + src/renderer/src/i18n/zh-CN/chat.json | 1 + src/renderer/src/i18n/zh-HK/chat.json | 1 + src/renderer/src/i18n/zh-TW/chat.json | 1 + src/renderer/src/pages/ChatPage.vue | 39 +++++- .../compactionService.test.ts | 1 + .../deepchatAgentPresenter.test.ts | 14 +++ test/main/presenter/mcpClient.test.ts | 2 + .../agentTools/agentToolManagerRead.test.ts | 6 + .../agentToolManagerSettings.test.ts | 1 + .../agentToolManagerSkillAccess.test.ts | 1 + .../subagentOrchestratorTool.test.ts | 1 + .../toolPresenter/toolPresenter.test.ts | 16 ++- test/renderer/components/ChatPage.test.ts | 50 +++++++- test/renderer/components/MessageList.test.ts | 38 +++++- .../message/MessageBlockBasics.test.ts | 17 +-- 31 files changed, 235 insertions(+), 105 deletions(-) diff --git a/src/main/presenter/deepchatAgentPresenter/compactionService.ts b/src/main/presenter/deepchatAgentPresenter/compactionService.ts index aa107c373..8da84e037 100644 --- a/src/main/presenter/deepchatAgentPresenter/compactionService.ts +++ b/src/main/presenter/deepchatAgentPresenter/compactionService.ts @@ -704,6 +704,7 @@ export class CompactionService { spanText: string ): Promise { const prompt = this.buildSummaryPrompt(previousSummary, spanText) + await this.llmProviderPresenter.executeWithRateLimit(model.providerId) const response = await this.llmProviderPresenter.generateText( model.providerId, prompt, diff --git a/src/main/presenter/deepchatAgentPresenter/index.ts b/src/main/presenter/deepchatAgentPresenter/index.ts index 21543e065..2ea89cc5e 100644 --- a/src/main/presenter/deepchatAgentPresenter/index.ts +++ b/src/main/presenter/deepchatAgentPresenter/index.ts @@ -3275,6 +3275,9 @@ export class DeepChatAgentPresenter implements IAgentImplementation { visionModel.modelId, visionModel.providerId ) + await this.llmProviderPresenter.executeWithRateLimit(visionModel.providerId, { + signal: abortSignal + }) const response = await this.llmProviderPresenter.generateCompletionStandalone( visionModel.providerId, messages, diff --git a/src/main/presenter/index.ts b/src/main/presenter/index.ts index 8db9dee68..098714433 100644 --- a/src/main/presenter/index.ts +++ b/src/main/presenter/index.ts @@ -337,6 +337,8 @@ export class Presenter implements IPresenter { this.filePresenter.prepareFileCompletely(absPath, typeInfo, contentType) }), getLlmProviderPresenter: () => ({ + executeWithRateLimit: (providerId, options) => + this.llmproviderPresenter.executeWithRateLimit(providerId, options), generateCompletionStandalone: (providerId, messages, modelId, temperature, maxTokens) => this.llmproviderPresenter.generateCompletionStandalone( providerId, diff --git a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts index 0fecb7f09..4a3e5f7b3 100644 --- a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts +++ b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts @@ -1237,6 +1237,7 @@ export class AgentToolManager { visionTarget.modelId, visionTarget.providerId ) + await this.getLlmProviderPresenter().executeWithRateLimit(visionTarget.providerId) const response = await this.getLlmProviderPresenter().generateCompletionStandalone( visionTarget.providerId, messages, diff --git a/src/main/presenter/toolPresenter/runtimePorts.ts b/src/main/presenter/toolPresenter/runtimePorts.ts index 78646ab38..12f52266e 100644 --- a/src/main/presenter/toolPresenter/runtimePorts.ts +++ b/src/main/presenter/toolPresenter/runtimePorts.ts @@ -61,7 +61,10 @@ export interface AgentToolRuntimePort { getSkillPresenter(): ISkillPresenter getYoBrowserToolHandler(): IYoBrowserPresenter['toolHandler'] getFilePresenter(): Pick - getLlmProviderPresenter(): Pick + getLlmProviderPresenter(): Pick< + ILlmProviderPresenter, + 'executeWithRateLimit' | 'generateCompletionStandalone' + > createSettingsWindow(): ReturnType sendToWindow( windowId: number, diff --git a/src/renderer/src/components/chat/MessageList.vue b/src/renderer/src/components/chat/MessageList.vue index b07b97317..9bf94f334 100644 --- a/src/renderer/src/components/chat/MessageList.vue +++ b/src/renderer/src/components/chat/MessageList.vue @@ -43,6 +43,14 @@ @copy-image="handleCopyImage" /> +
+ +
@@ -51,10 +59,12 @@ import { computed } from 'vue' import { useI18n } from 'vue-i18n' import MessageItemAssistant from '@/components/message/MessageItemAssistant.vue' +import MessageBlockAction from '@/components/message/MessageBlockAction.vue' import MessageItemUser from '@/components/message/MessageItemUser.vue' import { useMessageCapture } from '@/composables/message/useMessageCapture' import { type DisplayAssistantMessage, + type DisplayAssistantMessageBlock, isCompactionMessageItem, type DisplayUserMessage, type DisplayMessage, @@ -64,11 +74,17 @@ import { const props = withDefaults( defineProps<{ messages: MessageListItem[] + conversationId?: string + ephemeralRateLimitBlock?: DisplayAssistantMessageBlock | null + ephemeralRateLimitMessageId?: string | null isGenerating?: boolean traceMessageIds?: string[] isReadOnly?: boolean }>(), { + conversationId: '', + ephemeralRateLimitBlock: null, + ephemeralRateLimitMessageId: null, isGenerating: false, traceMessageIds: () => [], isReadOnly: false diff --git a/src/renderer/src/components/message/MessageBlockAction.vue b/src/renderer/src/components/message/MessageBlockAction.vue index 9bfc5a383..58601a5a1 100644 --- a/src/renderer/src/components/message/MessageBlockAction.vue +++ b/src/renderer/src/components/message/MessageBlockAction.vue @@ -1,7 +1,5 @@