diff --git a/.agents/base2/base2.ts b/.agents/base2/base2.ts index 98801fa91..d4ca20c07 100644 --- a/.agents/base2/base2.ts +++ b/.agents/base2/base2.ts @@ -36,7 +36,7 @@ export function createBase2( ? 'z-ai/glm-4.6:nitro' : 'anthropic/claude-sonnet-4.5', ...(isGpt5 && { - reasoningModel: { + reasoningOptions: { effort: 'high', }, }), diff --git a/.env.example b/.env.example index d0e1fcd29..3f9808ce9 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,7 @@ # AI API Keys CLAUDE_CODE_KEY=dummy_claude_code_key OPEN_ROUTER_API_KEY=dummy_openrouter_key +OPENAI_API_KEY=dummy_openai_key # Database & Server DATABASE_URL=postgresql://manicode_user_local:secretpassword_local@localhost:5432/manicode_db_local diff --git a/bun.lock b/bun.lock index 2561f27fc..581bcb492 100644 --- a/bun.lock +++ b/bun.lock @@ -798,9 +798,9 @@ "@inquirer/ansi": ["@inquirer/ansi@1.0.2", "", {}, "sha512-S8qNSZiYzFd0wAcyG5AXCvUHC5Sr7xpZ9wZ2py9XR88jUz8wooStVx5M6dRzczbBWjic9NP7+rY0Xi7qqK/aMQ=="], - "@inquirer/confirm": ["@inquirer/confirm@5.1.20", "", { "dependencies": { "@inquirer/core": "^10.3.1", "@inquirer/type": "^3.0.10" }, "peerDependencies": { "@types/node": ">=18" }, "optionalPeers": ["@types/node"] }, "sha512-HDGiWh2tyRZa0M1ZnEIUCQro25gW/mN8ODByicQrbR1yHx4hT+IOpozCMi5TgBtUdklLwRI2mv14eNpftDluEw=="], + "@inquirer/confirm": ["@inquirer/confirm@5.1.21", "", { "dependencies": { "@inquirer/core": "^10.3.2", "@inquirer/type": "^3.0.10" }, "peerDependencies": { "@types/node": ">=18" }, "optionalPeers": ["@types/node"] }, "sha512-KR8edRkIsUayMXV+o3Gv+q4jlhENF9nMYUZs9PA2HzrXeHI8M5uDag70U7RJn9yyiMZSbtF5/UexBtAVtZGSbQ=="], - "@inquirer/core": ["@inquirer/core@10.3.1", "", { "dependencies": { "@inquirer/ansi": "^1.0.2", "@inquirer/figures": "^1.0.15", "@inquirer/type": "^3.0.10", "cli-width": "^4.1.0", "mute-stream": "^3.0.0", "signal-exit": "^4.1.0", "wrap-ansi": "^6.2.0", "yoctocolors-cjs": "^2.1.3" }, "peerDependencies": { "@types/node": ">=18" }, "optionalPeers": ["@types/node"] }, "sha512-hzGKIkfomGFPgxKmnKEKeA+uCYBqC+TKtRx5LgyHRCrF6S2MliwRIjp3sUaWwVzMp7ZXVs8elB0Tfe682Rpg4w=="], + "@inquirer/core": ["@inquirer/core@10.3.2", "", { "dependencies": { "@inquirer/ansi": "^1.0.2", "@inquirer/figures": "^1.0.15", "@inquirer/type": "^3.0.10", "cli-width": "^4.1.0", "mute-stream": "^2.0.0", "signal-exit": "^4.1.0", "wrap-ansi": "^6.2.0", "yoctocolors-cjs": "^2.1.3" }, "peerDependencies": { "@types/node": ">=18" }, "optionalPeers": ["@types/node"] }, "sha512-43RTuEbfP8MbKzedNqBrlhhNKVwoK//vUFNW3Q3vZ88BLcrs4kYpGg+B2mm5p2K/HfygoCxuKwJJiv8PbGmE0A=="], "@inquirer/figures": ["@inquirer/figures@1.0.15", "", {}, "sha512-t2IEY+unGHOzAaVM5Xx6DEWKeXlDDcNPeDyUpsRc6CUhBfU3VQOEl+Vssh7VNp1dR8MdUJBWhuObjXCsVpjN5g=="], @@ -3140,7 +3140,7 @@ "msw": ["msw@2.12.1", "", { "dependencies": { "@inquirer/confirm": "^5.0.0", "@mswjs/interceptors": "^0.40.0", "@open-draft/deferred-promise": "^2.2.0", "@types/statuses": "^2.0.4", "cookie": "^1.0.2", "graphql": "^16.8.1", "headers-polyfill": "^4.0.2", "is-node-process": "^1.2.0", "outvariant": "^1.4.3", "path-to-regexp": "^6.3.0", "picocolors": "^1.1.1", "rettime": "^0.7.0", "statuses": "^2.0.2", "strict-event-emitter": "^0.5.1", "tough-cookie": "^6.0.0", "type-fest": "^4.26.1", "until-async": "^3.0.2", "yargs": "^17.7.2" }, "peerDependencies": { "typescript": ">= 4.8.x" }, "optionalPeers": ["typescript"], "bin": { "msw": "cli/index.js" } }, "sha512-arzsi9IZjjByiEw21gSUP82qHM8zkV69nNpWV6W4z72KiLvsDWoOp678ORV6cNfU/JGhlX0SsnD4oXo9gI6I2A=="], - "mute-stream": ["mute-stream@3.0.0", "", {}, "sha512-dkEJPVvun4FryqBmZ5KhDo0K9iDXAwn08tMLDinNdRBNPcYEDiWYysLcc6k3mjTMlbP9KyylvRpd4wFtwrT9rw=="], + "mute-stream": ["mute-stream@2.0.0", "", {}, "sha512-WWdIxpyjEn+FhQJQQv9aQAYlHoNVdzIzUySNV1gHUPDSdZJ3yZn7pAAbQcV7B56Mvu881q9FZV+0Vx2xC44VWA=="], "mylas": ["mylas@2.1.13", "", {}, "sha512-+MrqnJRtxdF+xngFfUUkIMQrUUL0KsxbADUkn23Z/4ibGg192Q+z+CQyiYwvWTsYjJygmMR8+w3ZDa98Zh6ESg=="], diff --git a/packages/internal/src/env-schema.ts b/packages/internal/src/env-schema.ts index cdddabbaa..c90d6885c 100644 --- a/packages/internal/src/env-schema.ts +++ b/packages/internal/src/env-schema.ts @@ -5,6 +5,7 @@ export const serverEnvSchema = clientEnvSchema.extend({ // Backend variables CODEBUFF_API_KEY: z.string().optional(), OPEN_ROUTER_API_KEY: z.string().min(1), + OPENAI_API_KEY: z.string().min(1), RELACE_API_KEY: z.string().min(1), LINKUP_API_KEY: z.string().min(1), CONTEXT7_API_KEY: z.string().optional(), @@ -44,6 +45,7 @@ export const serverProcessEnv: ServerInput = { // Backend variables CODEBUFF_API_KEY: process.env.CODEBUFF_API_KEY, OPEN_ROUTER_API_KEY: process.env.OPEN_ROUTER_API_KEY, + OPENAI_API_KEY: process.env.OPENAI_API_KEY, RELACE_API_KEY: process.env.RELACE_API_KEY, LINKUP_API_KEY: process.env.LINKUP_API_KEY, CONTEXT7_API_KEY: process.env.CONTEXT7_API_KEY, diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index 0316cb866..3cce9a1b4 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -21,6 +21,7 @@ import { handleOpenRouterNonStream, handleOpenRouterStream, } from '@/llm-api/openrouter' +import { handleOpenAIStream, OPENAI_SUPPORTED_MODELS } from '@/llm-api/openai' import { extractApiKeyFromHeader } from '@/util/auth' export async function postChatCompletions(params: { @@ -207,15 +208,31 @@ export async function postChatCompletions(params: { try { if (bodyStream) { // Streaming request - const stream = await handleOpenRouterStream({ - body, - userId, - agentId, - openrouterApiKey, - fetch, - logger, - insertMessageBigquery, - }) + const model = (body as any)?.model + const shortModelName = + typeof model === 'string' ? model.split('/')[1] : undefined + const isOpenAIDirectModel = + typeof model === 'string' && + model.startsWith('openai/') && + OPENAI_SUPPORTED_MODELS.includes(shortModelName as any) + const stream = await (isOpenAIDirectModel + ? handleOpenAIStream({ + body, + userId, + agentId, + fetch, + logger, + insertMessageBigquery, + }) + : handleOpenRouterStream({ + body, + userId, + agentId, + openrouterApiKey, + fetch, + logger, + insertMessageBigquery, + })) trackEvent({ event: AnalyticsEvent.CHAT_COMPLETIONS_STREAM_STARTED, diff --git a/web/src/llm-api/openai.ts b/web/src/llm-api/openai.ts new file mode 100644 index 000000000..7d5423e77 --- /dev/null +++ b/web/src/llm-api/openai.ts @@ -0,0 +1,371 @@ +import { setupBigQuery } from '@codebuff/bigquery' +import { consumeCreditsAndAddAgentStep } from '@codebuff/billing' +import { PROFIT_MARGIN } from '@codebuff/common/old-constants' +import { getErrorObject } from '@codebuff/common/util/error' +import { env } from '@codebuff/internal/env' + +import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/bigquery' +import type { Logger } from '@codebuff/common/types/contracts/logger' + +export const OPENAI_SUPPORTED_MODELS = ['gpt-5'] as const +export type OpenAIModel = (typeof OPENAI_SUPPORTED_MODELS)[number] + +const INPUT_TOKEN_COSTS: Record = { + 'gpt-5': 1.25, +} as const +const CACHED_INPUT_TOKEN_COSTS: Record = { + 'gpt-5': 0.125, +} as const +const OUTPUT_TOKEN_COSTS: Record = { + 'gpt-5': 10, +} as const + +type StreamState = { responseText: string; reasoningText: string } + +function extractRequestMetadata(params: { body: unknown; logger: Logger }) { + const { body, logger } = params + const rawClientId = (body as any)?.codebuff_metadata?.client_id + const clientId = typeof rawClientId === 'string' ? rawClientId : null + if (!clientId) { + logger.warn({ body }, 'Received request without client_id') + } + const rawRunId = (body as any)?.codebuff_metadata?.run_id + const clientRequestId: string | null = + typeof rawRunId === 'string' ? rawRunId : null + if (!clientRequestId) { + logger.warn({ body }, 'Received request without run_id') + } + return { clientId, clientRequestId } +} + +type OpenAIUsage = { + prompt_tokens?: number + prompt_tokens_details?: { cached_tokens?: number } | null + completion_tokens?: number + completion_tokens_details?: { reasoning_tokens?: number } | null + total_tokens?: number + // We will inject cost fields below + cost?: number + cost_details?: { upstream_inference_cost?: number | null } | null +} + +function computeCostDollars(usage: OpenAIUsage, model: OpenAIModel): number { + const inputTokenCost = INPUT_TOKEN_COSTS[model] + const cachedInputTokenCost = CACHED_INPUT_TOKEN_COSTS[model] + const outputTokenCost = OUTPUT_TOKEN_COSTS[model] + + const inTokens = usage.prompt_tokens ?? 0 + const cachedInTokens = usage.prompt_tokens_details?.cached_tokens ?? 0 + const outTokens = usage.completion_tokens ?? 0 + return ( + (inTokens / 1_000_000) * inputTokenCost + + (cachedInTokens / 1_000_000) * cachedInputTokenCost + + (outTokens / 1_000_000) * outputTokenCost + ) +} + +export async function handleOpenAIStream({ + body, + userId, + agentId, + fetch, + logger, + insertMessageBigquery, +}: { + body: any + userId: string + agentId: string + fetch: typeof globalThis.fetch + logger: Logger + insertMessageBigquery: InsertMessageBigqueryFn +}) { + const startTime = new Date() + const { clientId, clientRequestId } = extractRequestMetadata({ body, logger }) + + const { model } = body + const modelShortName = + typeof model === 'string' ? model.split('/')[1] : undefined + if ( + !modelShortName || + !OPENAI_SUPPORTED_MODELS.includes(modelShortName as OpenAIModel) + ) { + throw new Error( + `Unsupported OpenAI model: ${model} (supported models include only: ${OPENAI_SUPPORTED_MODELS.map((m) => `'${m}'`).join(', ')})`, + ) + } + + // Build OpenAI-compatible body + const openaiBody: Record = { + ...body, + model: modelShortName, + stream: true, + } + // Ensure usage in final chunk + const streamOptions = (openaiBody.stream_options as any) ?? {} + streamOptions.include_usage = true + openaiBody.stream_options = streamOptions + + // Transform max_tokens to max_completion_tokens + openaiBody.max_completion_tokens = + openaiBody.max_completion_tokens ?? openaiBody.max_tokens + delete (openaiBody as any).max_tokens + + // Transform reasoning to reasoning_effort + if (openaiBody.reasoning && typeof openaiBody.reasoning === 'object') { + const reasoning = openaiBody.reasoning as { + enabled?: boolean + effort?: 'high' | 'medium' | 'low' + } + const enabled = reasoning.enabled ?? true + + if (enabled) { + openaiBody.reasoning_effort = reasoning.effort ?? 'medium' + } + } + delete (openaiBody as any).reasoning + + // Remove fields that OpenAI doesn't support + delete (openaiBody as any).stop + delete (openaiBody as any).usage + delete (openaiBody as any).provider + delete (openaiBody as any).transforms + delete (openaiBody as any).codebuff_metadata + + const response = await fetch('https://api.openai.com/v1/chat/completions', { + method: 'POST', + headers: { + Authorization: `Bearer ${env.OPENAI_API_KEY}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(openaiBody), + }) + + if (!response.ok) { + throw new Error( + `OpenAI API error: ${response.status} ${response.statusText} ${await response.text()}`, + ) + } + + const reader = response.body?.getReader?.() + if (!reader) { + throw new Error('Failed to get response reader') + } + + let heartbeatInterval: NodeJS.Timeout + let state: StreamState = { responseText: '', reasoningText: '' } + let clientDisconnected = false + + const stream = new ReadableStream({ + async start(controller) { + const decoder = new TextDecoder() + let buffer = '' + + controller.enqueue( + new TextEncoder().encode(`: connected ${new Date().toISOString()}\n`), + ) + + heartbeatInterval = setInterval(() => { + if (!clientDisconnected) { + try { + controller.enqueue( + new TextEncoder().encode( + `: heartbeat ${new Date().toISOString()}\n\n`, + ), + ) + } catch {} + } + }, 30000) + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + let lineEnd = buffer.indexOf('\n') + + while (lineEnd !== -1) { + let line = buffer.slice(0, lineEnd + 1) + buffer = buffer.slice(lineEnd + 1) + + const handled = await handleOpenAILine({ + userId, + agentId, + clientId, + clientRequestId, + startTime, + request: openaiBody, + line, + modelShortName: modelShortName as OpenAIModel, + state, + logger, + insertMessage: insertMessageBigquery, + }) + state = handled.state + line = handled.outgoingLine + + if (!clientDisconnected) { + try { + controller.enqueue(new TextEncoder().encode(line)) + } catch (error) { + logger.warn( + 'Client disconnected during stream, continuing for billing', + ) + clientDisconnected = true + } + } + + lineEnd = buffer.indexOf('\n') + } + } + + if (!clientDisconnected) { + controller.close() + } + } catch (error) { + if (!clientDisconnected) { + controller.error(error) + } else { + logger.warn( + getErrorObject(error), + 'Error after client disconnect in OpenAI stream', + ) + } + } finally { + clearInterval(heartbeatInterval) + } + }, + cancel() { + clearInterval(heartbeatInterval) + clientDisconnected = true + logger.warn( + { clientDisconnected, state }, + 'Client cancelled stream, continuing OpenAI consumption for billing', + ) + }, + }) + + return stream +} + +async function handleOpenAILine({ + userId, + agentId, + clientId, + clientRequestId, + startTime, + modelShortName, + request, + line, + state, + logger, + insertMessage, +}: { + userId: string + agentId: string + clientId: string | null + clientRequestId: string | null + startTime: Date + modelShortName: OpenAIModel + request: unknown + line: string + state: StreamState + logger: Logger + insertMessage: InsertMessageBigqueryFn +}): Promise<{ state: StreamState; outgoingLine: string }> { + if (!line.startsWith('data: ')) { + return { state, outgoingLine: line } + } + const raw = line.slice('data: '.length) + if (raw === '[DONE]\n') { + return { state, outgoingLine: line } + } + + let obj: any + try { + obj = JSON.parse(raw) + } catch (error) { + logger.warn( + `Received non-JSON OpenAI response: ${JSON.stringify(getErrorObject(error), null, 2)}`, + ) + return { state, outgoingLine: line } + } + + // Accumulate text + try { + const choice = + Array.isArray(obj.choices) && obj.choices.length + ? obj.choices[0] + : undefined + const delta = choice?.delta + if (delta) { + if (typeof delta.content === 'string') state.responseText += delta.content + // OpenAI may not provide reasoning delta in standard chat completions; keep parity + if (typeof delta.reasoning === 'string') + state.reasoningText += delta.reasoning + } + } catch {} + + // If usage present, it's the final chunk. Compute cost, log, and consume credits. + if (obj && obj.usage) { + const usage: OpenAIUsage = obj.usage + const cost = computeCostDollars(usage, modelShortName) + obj.usage.cost = cost + obj.usage.cost_details = { upstream_inference_cost: null } + + // BigQuery insert (do not await) + setupBigQuery({ logger }).then(async () => { + const success = await insertMessage({ + row: { + id: obj.id, + user_id: userId, + finished_at: new Date(), + created_at: startTime, + request, + reasoning_text: state.reasoningText, + response: state.responseText, + output_tokens: usage.completion_tokens ?? 0, + reasoning_tokens: usage.completion_tokens_details?.reasoning_tokens, + cost: cost, + upstream_inference_cost: null, + input_tokens: usage.prompt_tokens ?? 0, + cache_read_input_tokens: usage.prompt_tokens_details?.cached_tokens, + }, + logger, + }) + if (!success) { + logger.error( + { request }, + 'Failed to insert message into BigQuery (OpenAI)', + ) + } + }) + + await consumeCreditsAndAddAgentStep({ + messageId: obj.id, + userId, + agentId, + clientId, + clientRequestId, + startTime, + model: obj.model, + reasoningText: state.reasoningText, + response: state.responseText, + cost, + credits: Math.round(cost * 100 * (1 + PROFIT_MARGIN)), + inputTokens: obj.usage.prompt_tokens ?? 0, + cacheCreationInputTokens: null, + cacheReadInputTokens: obj.usage.prompt_tokens_details?.cached_tokens ?? 0, + reasoningTokens: + obj.usage.completion_tokens_details?.reasoning_tokens ?? null, + outputTokens: obj.usage.completion_tokens ?? 0, + logger, + }) + + // Reconstruct outgoing line with injected cost + const newLine = `data: ${JSON.stringify(obj)}\n` + return { state, outgoingLine: newLine } + } + + return { state, outgoingLine: line } +}