diff --git a/.changeset/subagent-api-key-pool.md b/.changeset/subagent-api-key-pool.md new file mode 100644 index 00000000..eab091c3 --- /dev/null +++ b/.changeset/subagent-api-key-pool.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": minor +"@moonshot-ai/kimi-code": minor +--- + +Add API key pool for parallel subagent execution. When multiple `KIMI_API_KEY*` environment variables are configured, subagents rotate through them to avoid rate-limit contention, and failed keys are temporarily cooled down on retryable errors. diff --git a/packages/agent-core/src/rpc/core-impl.ts b/packages/agent-core/src/rpc/core-impl.ts index c0aa30dd..c313384a 100644 --- a/packages/agent-core/src/rpc/core-impl.ts +++ b/packages/agent-core/src/rpc/core-impl.ts @@ -29,6 +29,7 @@ import { ProviderManager, type BearerTokenProvider, type OAuthTokenProviderResolver } from '../session/provider-manager'; +import { ApiKeyPool } from '../session/api-key-pool'; import { SessionAPIImpl } from '../session/rpc'; import { normalizeWorkDir, SessionStore } from '../session/store'; import { noopTelemetryClient, withTelemetryContext, type TelemetryClient } from '../telemetry'; @@ -118,6 +119,7 @@ export class KimiCore implements PromisableMethods { private readonly resolveOAuthTokenProvider: OAuthTokenProviderResolver | undefined; private readonly skillDirs: readonly string[]; private readonly sessionStore: SessionStore; + private readonly apiKeyPool: ApiKeyPool | undefined; readonly plugins: PluginManager; private pluginsReady: Promise; private pluginsLoadError: Error | undefined; @@ -143,6 +145,7 @@ export class KimiCore implements PromisableMethods { this.resolveOAuthTokenProvider = options.resolveOAuthTokenProvider; this.skillDirs = options.skillDirs ?? []; this.telemetry = options.telemetry ?? noopTelemetryClient; + this.apiKeyPool = process.env['KIMI_API_KEY_POOL'] !== undefined ? (ApiKeyPool.fromEnv() ?? undefined) : undefined; ensureKimiHome(this.homeDir); this.config = readConfigFile(this.configPath); this.sessionStore = new SessionStore(this.homeDir); @@ -669,6 +672,7 @@ export class KimiCore implements PromisableMethods { kimiRequestHeaders: this.kimiRequestHeaders, resolveOAuthTokenProvider: this.resolveOAuthTokenProvider, promptCacheKey: sessionId, + apiKeyPool: this.apiKeyPool, }); } diff --git a/packages/agent-core/src/session/api-key-pool.ts b/packages/agent-core/src/session/api-key-pool.ts new file mode 100644 index 00000000..49512c7e --- /dev/null +++ b/packages/agent-core/src/session/api-key-pool.ts @@ -0,0 +1,135 @@ +/** + * APIKeyPool — round-robin allocator for multiple API keys. + * + * Designed for parallel subagent execution so that concurrent agents + * do not hammer a single key's rate-limit quota. + * + * Keys are read from environment variables: + * KIMI_API_KEY, KIMI_API_KEY_1, KIMI_API_KEY_2, … up to KIMI_API_KEY_99 + * + * A pool is only created when ≥2 keys are found; otherwise `fromEnv` + * returns `null` and callers fall back to the root provider's key. + */ + +interface KeyState { + consecutiveFailures: number; + cooldownUntil: number | null; +} + +const COOLDOWN_MS = [30_000, 300_000, 1_800_000] as const; + +function cooldownForFailures(failures: number): number { + if (failures <= 0) return 0; + if (failures <= COOLDOWN_MS.length) return COOLDOWN_MS.at(failures - 1)!; + return COOLDOWN_MS.at(-1)!; +} + +export class ApiKeyPool { + private readonly keys: readonly string[]; + private _index = 0; + private readonly states: Map; + + /** + * Build a pool from environment variables. + * + * Collects `PREFIX`, `PREFIX_1`, `PREFIX_2`, … up to `PREFIX_99`. + * Returns `null` when fewer than 2 keys are found. + */ + static fromEnv(prefix = 'KIMI_API_KEY'): ApiKeyPool | null { + const keys: string[] = []; + const primary = process.env[prefix]; + if (primary !== undefined && primary.trim().length > 0) { + keys.push(primary.trim()); + } + for (let i = 1; i < 100; i++) { + const val = process.env[`${prefix}_${i}`]; + if (val !== undefined && val.trim().length > 0) { + keys.push(val.trim()); + } + } + if (keys.length < 2) { + return null; + } + return new ApiKeyPool(keys); + } + + constructor(keys: readonly string[]) { + if (keys.length === 0) { + throw new Error('Key pool cannot be empty'); + } + this.keys = keys.slice(); + this.states = new Map(); + for (const key of this.keys) { + this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); + } + } + + /** Number of keys in the pool. */ + get keyCount(): number { + return this.keys.length; + } + + /** + * Acquire the next key in rotation. + * + * Skips keys that are in cooldown. If every key is cooling down, + * falls back to round-robin across the entire pool. + */ + acquire(): string { + const now = Date.now(); + for (let i = 0; i < this.keys.length; i++) { + const key = this.keys[this._index]!; + this._index = (this._index + 1) % this.keys.length; + const state = this.states.get(key); + if (state === undefined) { + continue; + } + if (state.cooldownUntil !== null) { + if (now < state.cooldownUntil) { + continue; + } + // Cooldown expired — reset the key to healthy. + this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); + } + return key; + } + // All keys in cooldown — fall back to round-robin. + const key = this.keys[this._index]!; + this._index = (this._index + 1) % this.keys.length; + return key; + } + + /** + * Record a failure for the given key. + * + * Applies exponential cooldown: + * 1st failure → 30s + * 2nd failure → 5min + * 3rd+ failure → 30min + */ + recordFailure(key: string): void { + const state = this.states.get(key); + if (state === undefined) { + return; + } + const failures = state.consecutiveFailures + 1; + this.states.set(key, { + consecutiveFailures: failures, + cooldownUntil: Date.now() + cooldownForFailures(failures), + }); + } + + /** Clear the failure state for a key (e.g. after a successful call). */ + resetKey(key: string): void { + const state = this.states.get(key); + if (state === undefined) { + return; + } + // Don't clear an active cooldown that may have been set by a concurrent + // failure while this request was still in flight. + if (state.cooldownUntil !== null && Date.now() < state.cooldownUntil) { + return; + } + this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); + } +} diff --git a/packages/agent-core/src/session/provider-manager.ts b/packages/agent-core/src/session/provider-manager.ts index c0e39179..ffe10dd9 100644 --- a/packages/agent-core/src/session/provider-manager.ts +++ b/packages/agent-core/src/session/provider-manager.ts @@ -1,6 +1,7 @@ import type { Logger } from '#/logging/types'; import type { ProviderConfig as KosongProviderConfig, ModelCapability, ProviderRequestAuth } from '@moonshot-ai/kosong'; -import { APIStatusError, createProvider, UNKNOWN_CAPABILITY } from '@moonshot-ai/kosong'; +import { ApiKeyPool } from './api-key-pool'; +import { APIStatusError, createProvider, isRetryableGenerateError, UNKNOWN_CAPABILITY } from '@moonshot-ai/kosong'; import type { KimiConfig, ModelAlias, OAuthRef, ProviderConfig } from '../config'; import { ErrorCodes, isKimiError, KimiError } from '../errors'; @@ -24,6 +25,7 @@ interface ProviderManagerOptions { readonly kimiRequestHeaders?: Record; readonly resolveOAuthTokenProvider?: OAuthTokenProviderResolver; readonly promptCacheKey?: string; + readonly apiKeyPool?: ApiKeyPool; } type AuthorizedRequest = ( @@ -123,68 +125,99 @@ export class ProviderManager implements ModelProvider { ): AuthorizedRequest | undefined { const { providerName } = this.resolveProviderConfig(model); const providerConfig = this.config.providers[providerName]; - if (providerConfig?.oauth === undefined) return undefined; - if (providerApiKey(providerConfig) !== undefined) { - // oauth + apiKey on the same provider makes request auth ambiguous: - // provider construction would prefer apiKey while runtime auth resolves - // OAuth. Reject it so misconfiguration surfaces at model resolution. - throw new KimiError( - ErrorCodes.CONFIG_INVALID, - `Provider "${providerName}" has both apiKey and oauth set in config.toml — they are mutually exclusive. Remove one.`, - ); - } + // OAuth path + if (providerConfig?.oauth !== undefined) { + if (providerApiKey(providerConfig) !== undefined) { + // oauth + apiKey on the same provider makes request auth ambiguous: + // provider construction would prefer apiKey while runtime auth resolves + // OAuth. Reject it so misconfiguration surfaces at model resolution. + throw new KimiError( + ErrorCodes.CONFIG_INVALID, + `Provider "${providerName}" has both apiKey and oauth set in config.toml — they are mutually exclusive. Remove one.`, + ); + } - const loginRequired = (cause?: unknown): KimiError => - new KimiError( - ErrorCodes.AUTH_LOGIN_REQUIRED, - `OAuth provider "${providerName}" requires login before it can be used.`, - cause === undefined ? undefined : { cause }, - ); + const loginRequired = (cause?: unknown): KimiError => + new KimiError( + ErrorCodes.AUTH_LOGIN_REQUIRED, + `OAuth provider "${providerName}" requires login before it can be used.`, + cause === undefined ? undefined : { cause }, + ); + + const tokenProvider = this.options.resolveOAuthTokenProvider?.(providerName, providerConfig.oauth); + if (tokenProvider === undefined) { + return async () => { + throw loginRequired(); + }; + } - const tokenProvider = this.options.resolveOAuthTokenProvider?.(providerName, providerConfig.oauth); - if (tokenProvider === undefined) { - return async () => { - throw loginRequired(); + const log = options?.log; + const fetchAuth = async (force: boolean): Promise => { + let apiKey: string; + try { + apiKey = await tokenProvider.getAccessToken(force ? { force: true } : undefined); + } catch (error) { + if (!isKimiError(error) || error.code !== ErrorCodes.AUTH_LOGIN_REQUIRED) { + log?.warn('oauth token fetch failed', { providerName, error }); + } + throw loginRequired(error); + } + if (apiKey.trim().length === 0) throw loginRequired(); + return { apiKey }; }; - } - const log = options?.log; - const fetchAuth = async (force: boolean): Promise => { - let apiKey: string; - try { - apiKey = await tokenProvider.getAccessToken(force ? { force: true } : undefined); - } catch (error) { - if (!isKimiError(error) || error.code !== ErrorCodes.AUTH_LOGIN_REQUIRED) { - log?.warn('oauth token fetch failed', { providerName, error }); + return async (request) => { + let auth = await fetchAuth(false); + for (let refreshed = false; ; refreshed = true) { + try { + return await request(auth); + } catch (error) { + if (!(error instanceof APIStatusError) || error.statusCode !== 401) throw error; + if (refreshed) { + throw new KimiError( + ErrorCodes.AUTH_LOGIN_REQUIRED, + 'OAuth provider credentials were rejected. Send /login to login.', + { + cause: error, + details: { statusCode: error.statusCode, requestId: error.requestId }, + }, + ); + } + auth = await fetchAuth(true); + } } - throw loginRequired(error); - } - if (apiKey.trim().length === 0) throw loginRequired(); - return { apiKey }; - }; + }; + } - return async (request) => { - let auth = await fetchAuth(false); - for (let refreshed = false; ; refreshed = true) { + // Key pool path — only for kimi provider when a pool is configured + // and the provider does not already have an explicit apiKey. + if ( + providerConfig?.type === 'kimi' && + this.options.apiKeyPool !== undefined && + providerApiKey(providerConfig) === undefined + ) { + const pool = this.options.apiKeyPool; + return async (request) => { + const key = pool.acquire(); + const auth: ProviderRequestAuth = { apiKey: key }; try { - return await request(auth); + const result = await request(auth); + pool.resetKey(key); + return result; } catch (error) { - if (!(error instanceof APIStatusError) || error.statusCode !== 401) throw error; - if (refreshed) { - throw new KimiError( - ErrorCodes.AUTH_LOGIN_REQUIRED, - 'OAuth provider credentials were rejected. Send /login to login.', - { - cause: error, - details: { statusCode: error.statusCode, requestId: error.requestId }, - }, - ); + if ( + isRetryableGenerateError(error) || + (error instanceof APIStatusError && error.statusCode === 401) + ) { + pool.recordFailure(key); } - auth = await fetchAuth(true); + throw error; } - } - }; + }; + } + + return undefined; } } diff --git a/packages/agent-core/test/harness/runtime-provider.test.ts b/packages/agent-core/test/harness/runtime-provider.test.ts index a0b3e8c7..da94934b 100644 --- a/packages/agent-core/test/harness/runtime-provider.test.ts +++ b/packages/agent-core/test/harness/runtime-provider.test.ts @@ -1,8 +1,11 @@ -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; +import { APIConnectionError, APIStatusError } from '@moonshot-ai/kosong'; import type { KimiConfig } from '../../src/config'; import { ErrorCodes, KimiError } from '../../src/errors'; import { ProviderManager } from '../../src/session/provider-manager'; +import { ApiKeyPool } from '../../src/session/api-key-pool'; +import type { ProviderRequestAuth } from '@moonshot-ai/kosong'; import { resolveThinkingLevel } from '../../src/agent/config/thinking'; // Thin wrapper that adapts the legacy `resolveRuntimeProvider(input)` shape to @@ -48,6 +51,24 @@ const BASE_CONFIG: KimiConfig = { }, }; +const POOL_BASE_CONFIG: KimiConfig = { + defaultModel: 'kimi-code/kimi-for-coding', + providers: { + 'managed:kimi-code': { + type: 'kimi', + baseUrl: 'https://api.example/v1', + }, + }, + models: { + 'kimi-code/kimi-for-coding': { + provider: 'managed:kimi-code', + model: 'kimi-for-coding', + maxContextSize: 1_000_000, + capabilities: ['thinking', 'image_in', 'video_in', 'tool_use'], + }, + }, +}; + const TEST_KIMI_HEADERS = { 'User-Agent': 'kimi-code-cli/0.0.0-test', 'X-Msh-Platform': 'kimi_code_cli', @@ -661,3 +682,270 @@ describe('resolveThinkingLevel', () => { expect(resolveThinkingLevel(undefined, {})).toBe('high'); }); }); + +describe('ProviderManager key pool', () => { + it('returns undefined for resolveAuth when no pool is configured', () => { + const manager = new ProviderManager({ config: BASE_CONFIG }); + const auth = manager.resolveAuth('kimi-code/kimi-for-coding'); + expect(auth).toBeUndefined(); + }); + + it('returns a key-pool wrapper for kimi provider when pool is present', async () => { + const pool = new ApiKeyPool(['pk-1', 'pk-2']); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding'); + expect(withAuth).toBeDefined(); + + const request = vi.fn(async (auth: ProviderRequestAuth) => { + return { ok: true, key: auth.apiKey! }; + }); + const result = await withAuth!(request); + expect(result.ok).toBe(true); + expect(result.key).toBe('pk-1'); + }); + + it('rotates to the next key on each resolveAuth call', async () => { + const pool = new ApiKeyPool(['pk-a', 'pk-b', 'pk-c']); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const keys: string[] = []; + const request = vi.fn(async (auth: ProviderRequestAuth) => { + keys.push(auth.apiKey!); + return { ok: true }; + }); + + await withAuth(request); + await withAuth(request); + await withAuth(request); + expect(keys).toEqual(['pk-a', 'pk-b', 'pk-c']); + }); + + it('records failure on retryable errors and re-throws', async () => { + const pool = new ApiKeyPool(['pk-1']); + const recordFailureSpy = vi.spyOn(pool, 'recordFailure'); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const request = vi.fn(async () => { + throw new APIStatusError(429, 'Too Many Requests'); + }); + + await expect(withAuth(request)).rejects.toThrow(APIStatusError); + expect(recordFailureSpy).toHaveBeenCalledWith('pk-1'); + expect(recordFailureSpy).toHaveBeenCalledTimes(1); + }); + + it('records failure on connection errors', async () => { + const pool = new ApiKeyPool(['pk-1']); + const recordFailureSpy = vi.spyOn(pool, 'recordFailure'); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const request = vi.fn(async () => { + throw new APIConnectionError('network error'); + }); + + await expect(withAuth(request)).rejects.toThrow(APIConnectionError); + expect(recordFailureSpy).toHaveBeenCalledWith('pk-1'); + }); + + it('records failure on 401 unauthorized errors', async () => { + const pool = new ApiKeyPool(['pk-1']); + const recordFailureSpy = vi.spyOn(pool, 'recordFailure'); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const request = vi.fn(async () => { + throw new APIStatusError(401, 'Unauthorized'); + }); + + await expect(withAuth(request)).rejects.toThrow(APIStatusError); + expect(recordFailureSpy).toHaveBeenCalledWith('pk-1'); + expect(recordFailureSpy).toHaveBeenCalledTimes(1); + }); + + it('does not record failure on non-retryable errors', async () => { + const pool = new ApiKeyPool(['pk-1']); + const recordFailureSpy = vi.spyOn(pool, 'recordFailure'); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const request = vi.fn(async () => { + throw new APIStatusError(400, 'Bad Request'); + }); + + await expect(withAuth(request)).rejects.toThrow(APIStatusError); + expect(recordFailureSpy).not.toHaveBeenCalled(); + }); + + it('resets key after a successful request', async () => { + const pool = new ApiKeyPool(['pk-1']); + const resetKeySpy = vi.spyOn(pool, 'resetKey'); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const request = vi.fn(async () => 'success'); + await withAuth(request); + expect(resetKeySpy).toHaveBeenCalledWith('pk-1'); + }); + + it('does not use key pool for non-kimi providers', () => { + const pool = new ApiKeyPool(['pk-1']); + const config: KimiConfig = { + defaultModel: 'gpt-alias', + providers: { + openai: { + type: 'openai', + apiKey: 'sk-openai', + }, + }, + models: { + 'gpt-alias': { + provider: 'openai', + model: 'gpt-runtime', + maxContextSize: 200000, + }, + }, + }; + const manager = new ProviderManager({ config, apiKeyPool: pool }); + const auth = manager.resolveAuth('gpt-alias'); + expect(auth).toBeUndefined(); + }); + + it('does not use key pool when provider already has an explicit apiKey', () => { + const pool = new ApiKeyPool(['pk-1']); + const config: KimiConfig = { + defaultModel: 'kimi-code/kimi-for-coding', + providers: { + 'managed:kimi-code': { + type: 'kimi', + apiKey: 'sk-explicit', + baseUrl: 'https://api.example/v1', + }, + }, + models: { + 'kimi-code/kimi-for-coding': { + provider: 'managed:kimi-code', + model: 'kimi-for-coding', + maxContextSize: 1_000_000, + }, + }, + }; + const manager = new ProviderManager({ config, apiKeyPool: pool }); + const auth = manager.resolveAuth('kimi-code/kimi-for-coding'); + // Explicit apiKey on provider means pool should not override it. + expect(auth).toBeUndefined(); + }); + + it('rotates keys correctly under concurrent resolveAuth calls', async () => { + const pool = new ApiKeyPool(['pk-a', 'pk-b', 'pk-c']); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const keys: string[] = []; + const requests = Array.from({ length: 5 }, async () => { + await withAuth(async (auth: ProviderRequestAuth) => { + keys.push(auth.apiKey!); + // Small async gap so the event loop can interleave other requests. + await new Promise((r) => { setTimeout(r, 1); }); + return 'ok'; + }); + }); + await Promise.all(requests); + + expect(keys).toEqual(['pk-a', 'pk-b', 'pk-c', 'pk-a', 'pk-b']); + }); + + it('distributes keys evenly under 60 concurrent withAuth requests', async () => { + const pool = new ApiKeyPool(['pk-0', 'pk-1', 'pk-2']); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + const keys: string[] = []; + await Promise.all( + Array.from({ length: 60 }, () => + withAuth(async (auth: ProviderRequestAuth) => { + keys.push(auth.apiKey!); + return 'ok'; + }), + ), + ); + + const counts = new Map(); + for (const k of keys) { + counts.set(k, (counts.get(k) ?? 0) + 1); + } + expect(counts.get('pk-0')).toBe(20); + expect(counts.get('pk-1')).toBe(20); + expect(counts.get('pk-2')).toBe(20); + }); + + it('cools down failed keys and reroutes under concurrent load', async () => { + const pool = new ApiKeyPool(['pk-good', 'pk-bad']); + const manager = new ProviderManager({ config: POOL_BASE_CONFIG, apiKeyPool: pool }); + const withAuth = manager.resolveAuth('kimi-code/kimi-for-coding')!; + + let callIndex = 0; + const keys: string[] = []; + const results = await Promise.allSettled( + Array.from({ length: 20 }, () => + withAuth(async (auth: ProviderRequestAuth) => { + keys.push(auth.apiKey!); + const idx = callIndex++; + // First 10 calls: even-indexed requests (0,2,4,6,8) get pk-good and succeed, + // odd-indexed get pk-bad and throw 429. Because acquire() is round-robin, + // the exact key depends on starting state, so we simulate by key value. + if (auth.apiKey === 'pk-bad') { + throw new APIStatusError(429, 'Too Many Requests'); + } + return 'success'; + }), + ), + ); + + // All pk-bad requests should have failed; pk-good should succeed. + const badCount = keys.filter((k) => k === 'pk-bad').length; + const goodCount = keys.filter((k) => k === 'pk-good').length; + expect(badCount).toBe(10); + expect(goodCount).toBe(10); + + const failures = results.filter((r) => r.status === 'rejected').length; + const successes = results.filter((r) => r.status === 'fulfilled').length; + expect(failures).toBe(10); + expect(successes).toBe(10); + + // After failures, pk-bad should be in cooldown. + const nextKey = pool.acquire(); + // Round-robin: after 20 calls index is at 0 again. pk-bad had 10 failures -> cooldown. + // It should be skipped, so next acquire returns pk-good. + expect(nextKey).toBe('pk-good'); + }); + + it('prefers OAuth over key pool when both are configured', async () => { + const pool = new ApiKeyPool(['pk-1']); + const config: KimiConfig = { + defaultModel: 'kimi-code/kimi-for-coding', + providers: { + 'managed:kimi-code': { + type: 'kimi', + apiKey: '', + baseUrl: 'https://api.example/v1', + oauth: { storage: 'file', key: 'oauth/kimi-code' }, + }, + }, + models: { + 'kimi-code/kimi-for-coding': { + provider: 'managed:kimi-code', + model: 'kimi-for-coding', + maxContextSize: 1_000_000, + }, + }, + }; + const manager = new ProviderManager({ config, apiKeyPool: pool }); + const auth = manager.resolveAuth('kimi-code/kimi-for-coding'); + // OAuth path returns a function that throws login-required when no token provider is set + expect(auth).toBeDefined(); + await expect(auth!(async () => 'ok')).rejects.toThrow(/requires login/); + }); +}); diff --git a/packages/agent-core/test/harness/runtime.test.ts b/packages/agent-core/test/harness/runtime.test.ts index a79600c3..c26022ab 100644 --- a/packages/agent-core/test/harness/runtime.test.ts +++ b/packages/agent-core/test/harness/runtime.test.ts @@ -21,6 +21,11 @@ describe('KimiCore runtime config', () => { await rm(tmp, { recursive: true, force: true }); } vi.unstubAllGlobals(); + for (const key of Object.keys(process.env)) { + if (key.startsWith('KIMI_API_KEY')) { + delete process.env[key]; + } + } }); it('uses the shared OAuth resolver for Moonshot service tokens', async () => { @@ -123,4 +128,37 @@ max_context_size = 100000 expect(mainAgent?.config.modelAlias).toBe('default-mock'); }); + + it('does not create apiKeyPool when KIMI_API_KEY_POOL is not set', async () => { + tmp = await mkdtemp(join(tmpdir(), 'kimi-core-runtime-')); + const homeDir = join(tmp, 'home'); + await mkdir(homeDir, { recursive: true }); + await writeFile(join(homeDir, 'config.toml'), ''); + + // Multiple keys present but opt-in flag missing + process.env['KIMI_API_KEY'] = 'sk-primary'; + process.env['KIMI_API_KEY_1'] = 'sk-backup'; + + const [coreRpc] = createRPC(); + const core = new KimiCore(coreRpc, { homeDir }); + + expect((core as any).apiKeyPool).toBeUndefined(); + }); + + it('creates apiKeyPool when KIMI_API_KEY_POOL is set', async () => { + tmp = await mkdtemp(join(tmpdir(), 'kimi-core-runtime-')); + const homeDir = join(tmp, 'home'); + await mkdir(homeDir, { recursive: true }); + await writeFile(join(homeDir, 'config.toml'), ''); + + process.env['KIMI_API_KEY_POOL'] = '1'; + process.env['KIMI_API_KEY'] = 'sk-primary'; + process.env['KIMI_API_KEY_1'] = 'sk-backup'; + + const [coreRpc] = createRPC(); + const core = new KimiCore(coreRpc, { homeDir }); + + expect((core as any).apiKeyPool).toBeDefined(); + expect((core as any).apiKeyPool.keyCount).toBe(2); + }); }); diff --git a/packages/agent-core/test/session/api-key-pool.test.ts b/packages/agent-core/test/session/api-key-pool.test.ts new file mode 100644 index 00000000..1a293a3b --- /dev/null +++ b/packages/agent-core/test/session/api-key-pool.test.ts @@ -0,0 +1,263 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { ApiKeyPool } from '../../src/session/api-key-pool'; + +describe('ApiKeyPool', () => { + let originalEnv: NodeJS.ProcessEnv; + + beforeEach(() => { + originalEnv = { ...process.env }; + }); + + afterEach(() => { + for (const key of Object.keys(process.env)) { + if (key.startsWith('TEST_API_KEY')) { + delete process.env[key]; + } + } + Object.assign(process.env, originalEnv); + vi.useRealTimers(); + }); + + describe('fromEnv', () => { + it('returns null when no keys are present', () => { + delete process.env['TEST_API_KEY']; + expect(ApiKeyPool.fromEnv('TEST_API_KEY')).toBeNull(); + }); + + it('returns null when only one key is present', () => { + process.env['TEST_API_KEY'] = 'sk-one'; + expect(ApiKeyPool.fromEnv('TEST_API_KEY')).toBeNull(); + }); + + it('collects primary + numbered keys', () => { + process.env['TEST_API_KEY'] = 'sk-primary'; + process.env['TEST_API_KEY_1'] = 'sk-1'; + process.env['TEST_API_KEY_2'] = 'sk-2'; + const pool = ApiKeyPool.fromEnv('TEST_API_KEY'); + expect(pool).not.toBeNull(); + expect(pool!.keyCount).toBe(3); + }); + + it('ignores gaps in numbering', () => { + process.env['TEST_API_KEY'] = 'sk-primary'; + process.env['TEST_API_KEY_5'] = 'sk-5'; + const pool = ApiKeyPool.fromEnv('TEST_API_KEY'); + expect(pool).not.toBeNull(); + expect(pool!.keyCount).toBe(2); + }); + + it('ignores empty keys', () => { + process.env['TEST_API_KEY'] = 'sk-primary'; + process.env['TEST_API_KEY_1'] = ''; + process.env['TEST_API_KEY_2'] = 'sk-2'; + const pool = ApiKeyPool.fromEnv('TEST_API_KEY'); + expect(pool).not.toBeNull(); + expect(pool!.keyCount).toBe(2); + }); + + it('ignores whitespace-only keys', () => { + process.env['TEST_API_KEY'] = 'sk-primary'; + process.env['TEST_API_KEY_1'] = ' '; + process.env['TEST_API_KEY_2'] = '\t\n'; + process.env['TEST_API_KEY_3'] = 'sk-3'; + const pool = ApiKeyPool.fromEnv('TEST_API_KEY'); + expect(pool).not.toBeNull(); + expect(pool!.keyCount).toBe(2); + }); + + it('defaults to KIMI_API_KEY prefix', () => { + // Use a unique prefix to avoid colliding with real environment keys. + const prefix = `TEST_DEFAULT_${Date.now()}`; + process.env[prefix] = 'sk-a'; + process.env[`${prefix}_1`] = 'sk-b'; + const pool = ApiKeyPool.fromEnv(prefix); + expect(pool).not.toBeNull(); + expect(pool!.keyCount).toBe(2); + delete process.env[prefix]; + delete process.env[`${prefix}_1`]; + }); + }); + + describe('constructor', () => { + it('throws when keys array is empty', () => { + expect(() => new ApiKeyPool([])).toThrow('Key pool cannot be empty'); + }); + + it('accepts a non-empty keys array', () => { + const pool = new ApiKeyPool(['k1', 'k2']); + expect(pool.keyCount).toBe(2); + }); + }); + + describe('acquire', () => { + it('rotates keys in round-robin order', () => { + const pool = new ApiKeyPool(['a', 'b', 'c']); + expect(pool.acquire()).toBe('a'); + expect(pool.acquire()).toBe('b'); + expect(pool.acquire()).toBe('c'); + expect(pool.acquire()).toBe('a'); + }); + + it('skips keys in cooldown', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b', 'c']); + pool.recordFailure('a'); + vi.advanceTimersByTime(10_000); + // a is still cooling down (30s), so it should be skipped + expect(pool.acquire()).toBe('b'); + expect(pool.acquire()).toBe('c'); + expect(pool.acquire()).toBe('b'); // a still skipped + }); + + it('resets expired cooldown and returns the key', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); + vi.advanceTimersByTime(30_001); + expect(pool.acquire()).toBe('a'); // cooldown expired, key is healthy again + }); + + it('falls back to round-robin when all keys are in cooldown', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); + pool.recordFailure('b'); + // Both are cooling down, fallback should still return something + expect(pool.acquire()).toBe('a'); + expect(pool.acquire()).toBe('b'); + }); + }); + + describe('recordFailure', () => { + it('applies 30s cooldown on first failure', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); + expect(pool.acquire()).toBe('b'); // a skipped + vi.advanceTimersByTime(30_001); + expect(pool.acquire()).toBe('a'); // a recovered + }); + + it('applies 5min cooldown on second failure', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); + pool.recordFailure('a'); + vi.advanceTimersByTime(30_001); + expect(pool.acquire()).toBe('b'); // a still cooling (5min) + vi.advanceTimersByTime(300_001); + expect(pool.acquire()).toBe('a'); // a recovered + }); + + it('applies 30min cooldown on third failure', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); + pool.recordFailure('a'); + pool.recordFailure('a'); + vi.advanceTimersByTime(300_001); + expect(pool.acquire()).toBe('b'); // a still cooling (30min) + vi.advanceTimersByTime(1_800_001); + expect(pool.acquire()).toBe('a'); // a recovered + }); + + it('is a no-op for unknown keys', () => { + const pool = new ApiKeyPool(['a']); + expect(() => { pool.recordFailure('unknown'); }).not.toThrow(); + expect(pool.acquire()).toBe('a'); + }); + }); + + describe('resetKey', () => { + it('clears failure state when cooldown has expired', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); // 30s cooldown + vi.advanceTimersByTime(30_001); + pool.resetKey('a'); + expect(pool.acquire()).toBe('a'); + }); + + it('does not clear an active cooldown', () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['a', 'b']); + pool.recordFailure('a'); // 30s cooldown + pool.resetKey('a'); // concurrent success should not wipe it + expect(pool.acquire()).toBe('b'); // a still cooling + vi.advanceTimersByTime(30_001); + expect(pool.acquire()).toBe('a'); // a recovered after cooldown + }); + + it('is a no-op for unknown keys', () => { + const pool = new ApiKeyPool(['a']); + expect(() => { pool.resetKey('unknown'); }).not.toThrow(); + }); + }); + + describe('concurrent stress', () => { + it('distributes 100 concurrent acquires evenly across 3 keys', async () => { + const pool = new ApiKeyPool(['k0', 'k1', 'k2']); + const keys = await Promise.all(Array.from({ length: 100 }, () => Promise.resolve(pool.acquire()))); + + expect(keys).toHaveLength(100); + const counts = new Map(); + for (const k of keys) { + counts.set(k, (counts.get(k) ?? 0) + 1); + } + // Round-robin over 3 keys -> 33, 34, 33 in order + expect(counts.get('k0')).toBe(34); + expect(counts.get('k1')).toBe(33); + expect(counts.get('k2')).toBe(33); + }); + + it('skips cooling keys even under 50 concurrent acquires', async () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['k0', 'k1', 'k2']); + pool.recordFailure('k1'); // 30s cooldown + + const keys = await Promise.all(Array.from({ length: 50 }, () => Promise.resolve(pool.acquire()))); + + const counts = new Map(); + for (const k of keys) { + counts.set(k, (counts.get(k) ?? 0) + 1); + } + expect(counts.get('k0')).toBe(25); + expect(counts.get('k1') ?? 0).toBe(0); // skipped entirely + expect(counts.get('k2')).toBe(25); + }); + + it('falls back to all keys when every key is cooling under heavy load', async () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['k0', 'k1']); + pool.recordFailure('k0'); + pool.recordFailure('k1'); + + const keys = await Promise.all(Array.from({ length: 20 }, () => Promise.resolve(pool.acquire()))); + + const counts = new Map(); + for (const k of keys) { + counts.set(k, (counts.get(k) ?? 0) + 1); + } + // Fallback to round-robin across entire pool despite cooldown + expect(counts.get('k0')).toBe(10); + expect(counts.get('k1')).toBe(10); + }); + + it('recovers a key mid-stream after cooldown expires', async () => { + vi.useFakeTimers(); + const pool = new ApiKeyPool(['k0', 'k1']); + pool.recordFailure('k0'); // 30s cooldown + + const firstBatch = await Promise.all(Array.from({ length: 10 }, () => Promise.resolve(pool.acquire()))); + expect(firstBatch.every((k) => k === 'k1')).toBe(true); + + vi.advanceTimersByTime(30_001); + + const secondBatch = await Promise.all(Array.from({ length: 10 }, () => Promise.resolve(pool.acquire()))); + // After cooldown, k0 is back in rotation + expect(secondBatch.filter((k) => k === 'k0').length).toBe(5); + expect(secondBatch.filter((k) => k === 'k1').length).toBe(5); + }); + }); +});