-
Notifications
You must be signed in to change notification settings - Fork 124
feat(agent-core): add API key pool for concurrent subagent key rotation #191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e64e2f9
41b988f
09ba310
d07dc18
ef0a7ea
6912d04
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| "@moonshot-ai/agent-core": minor | ||
| "@moonshot-ai/kimi-code": minor | ||
| --- | ||
|
|
||
| Add API key pool for parallel subagent execution. When multiple `KIMI_API_KEY*` environment variables are configured, subagents rotate through them to avoid rate-limit contention, and failed keys are temporarily cooled down on retryable errors. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,135 @@ | ||
| /** | ||
| * APIKeyPool — round-robin allocator for multiple API keys. | ||
| * | ||
| * Designed for parallel subagent execution so that concurrent agents | ||
| * do not hammer a single key's rate-limit quota. | ||
| * | ||
| * Keys are read from environment variables: | ||
| * KIMI_API_KEY, KIMI_API_KEY_1, KIMI_API_KEY_2, … up to KIMI_API_KEY_99 | ||
| * | ||
| * A pool is only created when ≥2 keys are found; otherwise `fromEnv` | ||
| * returns `null` and callers fall back to the root provider's key. | ||
| */ | ||
|
|
||
| interface KeyState { | ||
| consecutiveFailures: number; | ||
| cooldownUntil: number | null; | ||
| } | ||
|
|
||
| const COOLDOWN_MS = [30_000, 300_000, 1_800_000] as const; | ||
|
|
||
| function cooldownForFailures(failures: number): number { | ||
| if (failures <= 0) return 0; | ||
| if (failures <= COOLDOWN_MS.length) return COOLDOWN_MS.at(failures - 1)!; | ||
| return COOLDOWN_MS.at(-1)!; | ||
| } | ||
|
|
||
| export class ApiKeyPool { | ||
| private readonly keys: readonly string[]; | ||
| private _index = 0; | ||
| private readonly states: Map<string, KeyState>; | ||
|
|
||
| /** | ||
| * Build a pool from environment variables. | ||
| * | ||
| * Collects `PREFIX`, `PREFIX_1`, `PREFIX_2`, … up to `PREFIX_99`. | ||
| * Returns `null` when fewer than 2 keys are found. | ||
| */ | ||
| static fromEnv(prefix = 'KIMI_API_KEY'): ApiKeyPool | null { | ||
| const keys: string[] = []; | ||
| const primary = process.env[prefix]; | ||
| if (primary !== undefined && primary.trim().length > 0) { | ||
| keys.push(primary.trim()); | ||
| } | ||
| for (let i = 1; i < 100; i++) { | ||
| const val = process.env[`${prefix}_${i}`]; | ||
| if (val !== undefined && val.trim().length > 0) { | ||
| keys.push(val.trim()); | ||
| } | ||
| } | ||
| if (keys.length < 2) { | ||
| return null; | ||
| } | ||
| return new ApiKeyPool(keys); | ||
| } | ||
|
|
||
| constructor(keys: readonly string[]) { | ||
| if (keys.length === 0) { | ||
| throw new Error('Key pool cannot be empty'); | ||
| } | ||
| this.keys = keys.slice(); | ||
| this.states = new Map<string, KeyState>(); | ||
| for (const key of this.keys) { | ||
| this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); | ||
| } | ||
| } | ||
|
|
||
| /** Number of keys in the pool. */ | ||
| get keyCount(): number { | ||
| return this.keys.length; | ||
| } | ||
|
|
||
| /** | ||
| * Acquire the next key in rotation. | ||
| * | ||
| * Skips keys that are in cooldown. If every key is cooling down, | ||
| * falls back to round-robin across the entire pool. | ||
| */ | ||
| acquire(): string { | ||
| const now = Date.now(); | ||
| for (let i = 0; i < this.keys.length; i++) { | ||
| const key = this.keys[this._index]!; | ||
| this._index = (this._index + 1) % this.keys.length; | ||
| const state = this.states.get(key); | ||
| if (state === undefined) { | ||
| continue; | ||
| } | ||
| if (state.cooldownUntil !== null) { | ||
| if (now < state.cooldownUntil) { | ||
| continue; | ||
| } | ||
| // Cooldown expired — reset the key to healthy. | ||
| this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); | ||
| } | ||
| return key; | ||
| } | ||
| // All keys in cooldown — fall back to round-robin. | ||
| const key = this.keys[this._index]!; | ||
| this._index = (this._index + 1) % this.keys.length; | ||
| return key; | ||
| } | ||
|
|
||
| /** | ||
| * Record a failure for the given key. | ||
| * | ||
| * Applies exponential cooldown: | ||
| * 1st failure → 30s | ||
| * 2nd failure → 5min | ||
| * 3rd+ failure → 30min | ||
| */ | ||
| recordFailure(key: string): void { | ||
| const state = this.states.get(key); | ||
| if (state === undefined) { | ||
| return; | ||
| } | ||
| const failures = state.consecutiveFailures + 1; | ||
| this.states.set(key, { | ||
| consecutiveFailures: failures, | ||
| cooldownUntil: Date.now() + cooldownForFailures(failures), | ||
| }); | ||
| } | ||
|
|
||
| /** Clear the failure state for a key (e.g. after a successful call). */ | ||
| resetKey(key: string): void { | ||
| const state = this.states.get(key); | ||
| if (state === undefined) { | ||
| return; | ||
| } | ||
| // Don't clear an active cooldown that may have been set by a concurrent | ||
| // failure while this request was still in flight. | ||
| if (state.cooldownUntil !== null && Date.now() < state.cooldownUntil) { | ||
| return; | ||
| } | ||
| this.states.set(key, { consecutiveFailures: 0, cooldownUntil: null }); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import type { Logger } from '#/logging/types'; | ||
| import type { ProviderConfig as KosongProviderConfig, ModelCapability, ProviderRequestAuth } from '@moonshot-ai/kosong'; | ||
| import { APIStatusError, createProvider, UNKNOWN_CAPABILITY } from '@moonshot-ai/kosong'; | ||
| import { ApiKeyPool } from './api-key-pool'; | ||
| import { APIStatusError, createProvider, isRetryableGenerateError, UNKNOWN_CAPABILITY } from '@moonshot-ai/kosong'; | ||
| import type { KimiConfig, ModelAlias, OAuthRef, ProviderConfig } from '../config'; | ||
| import { ErrorCodes, isKimiError, KimiError } from '../errors'; | ||
|
|
||
|
|
@@ -24,6 +25,7 @@ interface ProviderManagerOptions { | |
| readonly kimiRequestHeaders?: Record<string, string>; | ||
| readonly resolveOAuthTokenProvider?: OAuthTokenProviderResolver; | ||
| readonly promptCacheKey?: string; | ||
| readonly apiKeyPool?: ApiKeyPool; | ||
| } | ||
|
|
||
| type AuthorizedRequest = <T>( | ||
|
|
@@ -123,68 +125,99 @@ export class ProviderManager implements ModelProvider { | |
| ): AuthorizedRequest | undefined { | ||
| const { providerName } = this.resolveProviderConfig(model); | ||
| const providerConfig = this.config.providers[providerName]; | ||
| if (providerConfig?.oauth === undefined) return undefined; | ||
|
|
||
| if (providerApiKey(providerConfig) !== undefined) { | ||
| // oauth + apiKey on the same provider makes request auth ambiguous: | ||
| // provider construction would prefer apiKey while runtime auth resolves | ||
| // OAuth. Reject it so misconfiguration surfaces at model resolution. | ||
| throw new KimiError( | ||
| ErrorCodes.CONFIG_INVALID, | ||
| `Provider "${providerName}" has both apiKey and oauth set in config.toml — they are mutually exclusive. Remove one.`, | ||
| ); | ||
| } | ||
| // OAuth path | ||
| if (providerConfig?.oauth !== undefined) { | ||
| if (providerApiKey(providerConfig) !== undefined) { | ||
| // oauth + apiKey on the same provider makes request auth ambiguous: | ||
| // provider construction would prefer apiKey while runtime auth resolves | ||
| // OAuth. Reject it so misconfiguration surfaces at model resolution. | ||
| throw new KimiError( | ||
| ErrorCodes.CONFIG_INVALID, | ||
| `Provider "${providerName}" has both apiKey and oauth set in config.toml — they are mutually exclusive. Remove one.`, | ||
| ); | ||
| } | ||
|
|
||
| const loginRequired = (cause?: unknown): KimiError => | ||
| new KimiError( | ||
| ErrorCodes.AUTH_LOGIN_REQUIRED, | ||
| `OAuth provider "${providerName}" requires login before it can be used.`, | ||
| cause === undefined ? undefined : { cause }, | ||
| ); | ||
| const loginRequired = (cause?: unknown): KimiError => | ||
| new KimiError( | ||
| ErrorCodes.AUTH_LOGIN_REQUIRED, | ||
| `OAuth provider "${providerName}" requires login before it can be used.`, | ||
| cause === undefined ? undefined : { cause }, | ||
| ); | ||
|
|
||
| const tokenProvider = this.options.resolveOAuthTokenProvider?.(providerName, providerConfig.oauth); | ||
| if (tokenProvider === undefined) { | ||
| return async () => { | ||
| throw loginRequired(); | ||
| }; | ||
| } | ||
|
|
||
| const tokenProvider = this.options.resolveOAuthTokenProvider?.(providerName, providerConfig.oauth); | ||
| if (tokenProvider === undefined) { | ||
| return async () => { | ||
| throw loginRequired(); | ||
| const log = options?.log; | ||
| const fetchAuth = async (force: boolean): Promise<ProviderRequestAuth> => { | ||
| let apiKey: string; | ||
| try { | ||
| apiKey = await tokenProvider.getAccessToken(force ? { force: true } : undefined); | ||
| } catch (error) { | ||
| if (!isKimiError(error) || error.code !== ErrorCodes.AUTH_LOGIN_REQUIRED) { | ||
| log?.warn('oauth token fetch failed', { providerName, error }); | ||
| } | ||
| throw loginRequired(error); | ||
| } | ||
| if (apiKey.trim().length === 0) throw loginRequired(); | ||
| return { apiKey }; | ||
| }; | ||
| } | ||
|
|
||
| const log = options?.log; | ||
| const fetchAuth = async (force: boolean): Promise<ProviderRequestAuth> => { | ||
| let apiKey: string; | ||
| try { | ||
| apiKey = await tokenProvider.getAccessToken(force ? { force: true } : undefined); | ||
| } catch (error) { | ||
| if (!isKimiError(error) || error.code !== ErrorCodes.AUTH_LOGIN_REQUIRED) { | ||
| log?.warn('oauth token fetch failed', { providerName, error }); | ||
| return async (request) => { | ||
| let auth = await fetchAuth(false); | ||
| for (let refreshed = false; ; refreshed = true) { | ||
| try { | ||
| return await request(auth); | ||
| } catch (error) { | ||
| if (!(error instanceof APIStatusError) || error.statusCode !== 401) throw error; | ||
| if (refreshed) { | ||
| throw new KimiError( | ||
| ErrorCodes.AUTH_LOGIN_REQUIRED, | ||
| 'OAuth provider credentials were rejected. Send /login to login.', | ||
| { | ||
| cause: error, | ||
| details: { statusCode: error.statusCode, requestId: error.requestId }, | ||
| }, | ||
| ); | ||
| } | ||
| auth = await fetchAuth(true); | ||
| } | ||
| } | ||
| throw loginRequired(error); | ||
| } | ||
| if (apiKey.trim().length === 0) throw loginRequired(); | ||
| return { apiKey }; | ||
| }; | ||
| }; | ||
| } | ||
|
|
||
| return async (request) => { | ||
| let auth = await fetchAuth(false); | ||
| for (let refreshed = false; ; refreshed = true) { | ||
| // Key pool path — only for kimi provider when a pool is configured | ||
| // and the provider does not already have an explicit apiKey. | ||
| if ( | ||
| providerConfig?.type === 'kimi' && | ||
| this.options.apiKeyPool !== undefined && | ||
| providerApiKey(providerConfig) === undefined | ||
| ) { | ||
| const pool = this.options.apiKeyPool; | ||
| return async (request) => { | ||
| const key = pool.acquire(); | ||
| const auth: ProviderRequestAuth = { apiKey: key }; | ||
| try { | ||
| return await request(auth); | ||
| const result = await request(auth); | ||
| pool.resetKey(key); | ||
|
Comment on lines
+205
to
+206
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
With parallel subagents, the same pooled key can have multiple in-flight requests; if one request for a key returns 429 and records a cooldown, a different request for that same key that was already in flight can later succeed and this unconditional Useful? React with 👍 / 👎. |
||
| return result; | ||
| } catch (error) { | ||
| if (!(error instanceof APIStatusError) || error.statusCode !== 401) throw error; | ||
| if (refreshed) { | ||
| throw new KimiError( | ||
| ErrorCodes.AUTH_LOGIN_REQUIRED, | ||
| 'OAuth provider credentials were rejected. Send /login to login.', | ||
| { | ||
| cause: error, | ||
| details: { statusCode: error.statusCode, requestId: error.requestId }, | ||
| }, | ||
| ); | ||
| if ( | ||
| isRetryableGenerateError(error) || | ||
| (error instanceof APIStatusError && error.statusCode === 401) | ||
| ) { | ||
| pool.recordFailure(key); | ||
| } | ||
| auth = await fetchAuth(true); | ||
| throw error; | ||
| } | ||
| } | ||
| }; | ||
| }; | ||
| } | ||
|
|
||
| return undefined; | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because each
withAuthinvocation acquires a fresh key, a video read can upload the file throughcreateVideoUploaderwith one pool key and then the next LLM generation that consumes the returnedms://...file id can run with another key. In sessions with two or more pool keys and video inputs, that crosses API-key/account boundaries for file IDs and can make the follow-up Kimi request unable to access the uploaded video. The pool should pin a key for related requests, such as at agent/turn scope, rather than rotating independently per request.Useful? React with 👍 / 👎.