Skip to content

Commit 4a365d4

Browse files
committed
Initial impl
1 parent d04e760 commit 4a365d4

File tree

4 files changed

+341
-8
lines changed

4 files changed

+341
-8
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# AI API Keys
22
CLAUDE_CODE_KEY=dummy_claude_code_key
33
OPEN_ROUTER_API_KEY=dummy_openrouter_key
4+
OPENAI_API_KEY=dummy_openai_key
45

56
# Database & Server
67
DATABASE_URL=postgresql://manicode_user_local:secretpassword_local@localhost:5432/manicode_db_local

packages/internal/src/env-schema.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export const serverEnvSchema = clientEnvSchema.extend({
55
// Backend variables
66
CODEBUFF_API_KEY: z.string().optional(),
77
OPEN_ROUTER_API_KEY: z.string().min(1),
8+
OPENAI_API_KEY: z.string().min(1),
89
RELACE_API_KEY: z.string().min(1),
910
LINKUP_API_KEY: z.string().min(1),
1011
CONTEXT7_API_KEY: z.string().optional(),
@@ -44,6 +45,7 @@ export const serverProcessEnv: ServerInput = {
4445
// Backend variables
4546
CODEBUFF_API_KEY: process.env.CODEBUFF_API_KEY,
4647
OPEN_ROUTER_API_KEY: process.env.OPEN_ROUTER_API_KEY,
48+
OPENAI_API_KEY: process.env.OPENAI_API_KEY,
4749
RELACE_API_KEY: process.env.RELACE_API_KEY,
4850
LINKUP_API_KEY: process.env.LINKUP_API_KEY,
4951
CONTEXT7_API_KEY: process.env.CONTEXT7_API_KEY,

web/src/app/api/v1/chat/completions/_post.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
handleOpenRouterNonStream,
2121
handleOpenRouterStream,
2222
} from '@/llm-api/openrouter'
23+
import { handleOpenAIStream } from '@/llm-api/openai'
2324
import { extractApiKeyFromHeader } from '@/util/auth'
2425

2526
export async function postChatCompletions(params: {
@@ -204,14 +205,27 @@ export async function postChatCompletions(params: {
204205
try {
205206
if (bodyStream) {
206207
// Streaming request
207-
const stream = await handleOpenRouterStream({
208-
body,
209-
userId,
210-
agentId,
211-
fetch,
212-
logger,
213-
insertMessageBigquery,
214-
})
208+
const model = (body as any)?.model
209+
const isOpenAIDirectModel =
210+
typeof model === 'string' &&
211+
(!model.includes('/') || model.startsWith('openai/'))
212+
const stream = await (isOpenAIDirectModel
213+
? handleOpenAIStream({
214+
body,
215+
userId,
216+
agentId,
217+
fetch,
218+
logger,
219+
insertMessageBigquery,
220+
})
221+
: handleOpenRouterStream({
222+
body,
223+
userId,
224+
agentId,
225+
fetch,
226+
logger,
227+
insertMessageBigquery,
228+
}))
215229

216230
trackEvent({
217231
event: AnalyticsEvent.CHAT_COMPLETIONS_STREAM_STARTED,

web/src/llm-api/openai.ts

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
import { setupBigQuery } from '@codebuff/bigquery'
2+
import { consumeCreditsAndAddAgentStep } from '@codebuff/billing'
3+
import { PROFIT_MARGIN } from '@codebuff/common/old-constants'
4+
import { getErrorObject } from '@codebuff/common/util/error'
5+
import { env } from '@codebuff/internal/env'
6+
7+
import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/bigquery'
8+
import type { Logger } from '@codebuff/common/types/contracts/logger'
9+
10+
type StreamState = { responseText: string; reasoningText: string }
11+
12+
function extractRequestMetadata(params: { body: unknown; logger: Logger }) {
13+
const { body, logger } = params
14+
const rawClientId = (body as any)?.codebuff_metadata?.client_id
15+
const clientId = typeof rawClientId === 'string' ? rawClientId : null
16+
if (!clientId) {
17+
logger.warn({ body }, 'Received request without client_id')
18+
}
19+
const rawRunId = (body as any)?.codebuff_metadata?.run_id
20+
const clientRequestId: string | null = typeof rawRunId === 'string' ? rawRunId : null
21+
if (!clientRequestId) {
22+
logger.warn({ body }, 'Received request without run_id')
23+
}
24+
return { clientId, clientRequestId }
25+
}
26+
27+
function normalizeOpenAIModel(model: unknown): string | undefined {
28+
if (typeof model !== 'string') return undefined
29+
return model.startsWith('openai/') ? model.slice('openai/'.length) : model
30+
}
31+
32+
type OpenAIUsage = {
33+
prompt_tokens?: number
34+
prompt_tokens_details?: { cached_tokens?: number } | null
35+
completion_tokens?: number
36+
completion_tokens_details?: { reasoning_tokens?: number } | null
37+
total_tokens?: number
38+
// We will inject cost fields below
39+
cost?: number
40+
cost_details?: { upstream_inference_cost?: number | null } | null
41+
}
42+
43+
function getOpenAIRatesPerMTokens(model: string): { inUsd: number; outUsd: number } {
44+
const m = model.toLowerCase()
45+
if (m.includes('gpt-4o-mini') || m.includes('4o-mini') || m.includes('o4-mini')) {
46+
return { inUsd: 0.15, outUsd: 0.6 }
47+
}
48+
if (m.includes('gpt-4o')) {
49+
return { inUsd: 2.5, outUsd: 10 }
50+
}
51+
if (m.includes('gpt-4.1')) {
52+
return { inUsd: 5, outUsd: 15 }
53+
}
54+
if (m.startsWith('o3-pro')) {
55+
return { inUsd: 5, outUsd: 15 }
56+
}
57+
if (m.startsWith('o3')) {
58+
return { inUsd: 5, outUsd: 15 }
59+
}
60+
if (m.startsWith('gpt-5')) {
61+
return { inUsd: 5, outUsd: 15 }
62+
}
63+
return { inUsd: 2.5, outUsd: 10 }
64+
}
65+
66+
function computeCostDollars(usage: OpenAIUsage, model: string): number {
67+
const { inUsd, outUsd } = getOpenAIRatesPerMTokens(model)
68+
const inTokens = usage.prompt_tokens ?? 0
69+
const outTokens = usage.completion_tokens ?? 0
70+
return (inTokens / 1_000_000) * inUsd + (outTokens / 1_000_000) * outUsd
71+
}
72+
73+
export async function handleOpenAIStream({
74+
body,
75+
userId,
76+
agentId,
77+
fetch,
78+
logger,
79+
insertMessageBigquery,
80+
}: {
81+
body: any
82+
userId: string
83+
agentId: string
84+
fetch: typeof globalThis.fetch
85+
logger: Logger
86+
insertMessageBigquery: InsertMessageBigqueryFn
87+
}) {
88+
const startTime = new Date()
89+
const { clientId, clientRequestId } = extractRequestMetadata({ body, logger })
90+
91+
const model = normalizeOpenAIModel((body as any)?.model)
92+
93+
// Build OpenAI-compatible body
94+
const openaiBody: Record<string, unknown> = { ...body, model, stream: true }
95+
// Ensure usage in final chunk
96+
const streamOptions = (openaiBody.stream_options as any) ?? {}
97+
streamOptions.include_usage = true
98+
openaiBody.stream_options = streamOptions
99+
100+
// Remove fields that OpenAI might not accept
101+
delete (openaiBody as any).usage
102+
delete (openaiBody as any).provider
103+
delete (openaiBody as any).transforms
104+
delete (openaiBody as any).codebuff_metadata
105+
106+
const response = await fetch('https://api.openai.com/v1/chat/completions', {
107+
method: 'POST',
108+
headers: {
109+
Authorization: `Bearer ${env.OPENAI_API_KEY}`,
110+
'Content-Type': 'application/json',
111+
},
112+
body: JSON.stringify(openaiBody),
113+
})
114+
115+
if (!response.ok) {
116+
throw new Error(`OpenAI API error: ${response.status} ${response.statusText}`)
117+
}
118+
119+
const reader = response.body?.getReader()
120+
if (!reader) {
121+
throw new Error('Failed to get response reader')
122+
}
123+
124+
let heartbeatInterval: NodeJS.Timeout
125+
let state: StreamState = { responseText: '', reasoningText: '' }
126+
let clientDisconnected = false
127+
128+
const stream = new ReadableStream({
129+
async start(controller) {
130+
const decoder = new TextDecoder()
131+
let buffer = ''
132+
133+
controller.enqueue(new TextEncoder().encode(`: connected ${new Date().toISOString()}\n`))
134+
135+
heartbeatInterval = setInterval(() => {
136+
if (!clientDisconnected) {
137+
try {
138+
controller.enqueue(new TextEncoder().encode(`: heartbeat ${new Date().toISOString()}\n\n`))
139+
} catch {}
140+
}
141+
}, 30000)
142+
143+
try {
144+
while (true) {
145+
const { done, value } = await reader.read()
146+
if (done) break
147+
148+
buffer += decoder.decode(value, { stream: true })
149+
let lineEnd = buffer.indexOf('\n')
150+
151+
while (lineEnd !== -1) {
152+
let line = buffer.slice(0, lineEnd + 1)
153+
buffer = buffer.slice(lineEnd + 1)
154+
155+
const handled = await handleOpenAILine({
156+
userId,
157+
agentId,
158+
clientId,
159+
clientRequestId,
160+
startTime,
161+
request: openaiBody,
162+
line,
163+
state,
164+
logger,
165+
insertMessage: insertMessageBigquery,
166+
})
167+
state = handled.state
168+
line = handled.outgoingLine
169+
170+
if (!clientDisconnected) {
171+
try {
172+
controller.enqueue(new TextEncoder().encode(line))
173+
} catch (error) {
174+
logger.warn('Client disconnected during stream, continuing for billing')
175+
clientDisconnected = true
176+
}
177+
}
178+
179+
lineEnd = buffer.indexOf('\n')
180+
}
181+
}
182+
183+
if (!clientDisconnected) {
184+
controller.close()
185+
}
186+
} catch (error) {
187+
if (!clientDisconnected) {
188+
controller.error(error)
189+
} else {
190+
logger.warn(getErrorObject(error), 'Error after client disconnect in OpenAI stream')
191+
}
192+
} finally {
193+
clearInterval(heartbeatInterval)
194+
}
195+
},
196+
cancel() {
197+
clearInterval(heartbeatInterval)
198+
clientDisconnected = true
199+
logger.warn({ clientDisconnected, state }, 'Client cancelled stream, continuing OpenAI consumption for billing')
200+
},
201+
})
202+
203+
return stream
204+
}
205+
206+
async function handleOpenAILine({
207+
userId,
208+
agentId,
209+
clientId,
210+
clientRequestId,
211+
startTime,
212+
request,
213+
line,
214+
state,
215+
logger,
216+
insertMessage,
217+
}: {
218+
userId: string
219+
agentId: string
220+
clientId: string | null
221+
clientRequestId: string | null
222+
startTime: Date
223+
request: unknown
224+
line: string
225+
state: StreamState
226+
logger: Logger
227+
insertMessage: InsertMessageBigqueryFn
228+
}): Promise<{ state: StreamState; outgoingLine: string }> {
229+
if (!line.startsWith('data: ')) {
230+
return { state, outgoingLine: line }
231+
}
232+
const raw = line.slice('data: '.length)
233+
if (raw === '[DONE]\n') {
234+
return { state, outgoingLine: line }
235+
}
236+
237+
let obj: any
238+
try {
239+
obj = JSON.parse(raw)
240+
} catch (error) {
241+
logger.warn(`Received non-JSON OpenAI response: ${JSON.stringify(getErrorObject(error), null, 2)}`)
242+
return { state, outgoingLine: line }
243+
}
244+
245+
// Accumulate text
246+
try {
247+
const choice = Array.isArray(obj.choices) && obj.choices.length ? obj.choices[0] : undefined
248+
const delta = choice?.delta
249+
if (delta) {
250+
if (typeof delta.content === 'string') state.responseText += delta.content
251+
// OpenAI may not provide reasoning delta in standard chat completions; keep parity
252+
if (typeof delta.reasoning === 'string') state.reasoningText += delta.reasoning
253+
}
254+
} catch {}
255+
256+
// If usage present, it's the final chunk. Compute cost, log, and consume credits.
257+
if (obj && obj.usage) {
258+
const usage: OpenAIUsage = obj.usage
259+
const model: string = typeof obj.model === 'string' ? obj.model : (typeof (request as any)?.model === 'string' ? (request as any).model : '')
260+
261+
const cost = computeCostDollars(usage, model)
262+
obj.usage.cost = cost
263+
obj.usage.cost_details = { upstream_inference_cost: null }
264+
265+
// BigQuery insert (do not await)
266+
setupBigQuery({ logger }).then(async () => {
267+
const success = await insertMessage({
268+
row: {
269+
id: obj.id,
270+
user_id: userId,
271+
finished_at: new Date(),
272+
created_at: startTime,
273+
request,
274+
reasoning_text: state.reasoningText,
275+
response: state.responseText,
276+
output_tokens: usage.completion_tokens ?? 0,
277+
reasoning_tokens: usage.completion_tokens_details?.reasoning_tokens,
278+
cost: cost,
279+
upstream_inference_cost: null,
280+
input_tokens: usage.prompt_tokens ?? 0,
281+
cache_read_input_tokens: usage.prompt_tokens_details?.cached_tokens,
282+
},
283+
logger,
284+
})
285+
if (!success) {
286+
logger.error({ request }, 'Failed to insert message into BigQuery (OpenAI)')
287+
}
288+
})
289+
290+
await consumeCreditsAndAddAgentStep({
291+
messageId: obj.id,
292+
userId,
293+
agentId,
294+
clientId,
295+
clientRequestId,
296+
startTime,
297+
model: obj.model,
298+
reasoningText: state.reasoningText,
299+
response: state.responseText,
300+
cost,
301+
credits: Math.round(cost * 100 * (1 + PROFIT_MARGIN)),
302+
inputTokens: obj.usage.prompt_tokens ?? 0,
303+
cacheCreationInputTokens: null,
304+
cacheReadInputTokens: obj.usage.prompt_tokens_details?.cached_tokens ?? 0,
305+
reasoningTokens: obj.usage.completion_tokens_details?.reasoning_tokens ?? null,
306+
outputTokens: obj.usage.completion_tokens ?? 0,
307+
logger,
308+
})
309+
310+
// Reconstruct outgoing line with injected cost
311+
const newLine = `data: ${JSON.stringify(obj)}\n`
312+
return { state, outgoingLine: newLine }
313+
}
314+
315+
return { state, outgoingLine: line }
316+
}

0 commit comments

Comments
 (0)