Skip to content

Commit dd34e4c

Browse files
committed
fix: add trace-level metadata for all chat messages
1 parent c78cee3 commit dd34e4c

File tree

3 files changed

+98
-6
lines changed

3 files changed

+98
-6
lines changed

src/api/models/bedrock.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
438438
# All other chunks will also include a usage field, but with a null value.
439439
yield self.stream_response_to_bytes(stream_response)
440440

441-
# Update Langfuse with final streaming metadata
441+
# Update Langfuse with final streaming metadata (both observation and trace)
442442
if final_usage or accumulated_output:
443443
update_params = {}
444444
if accumulated_output:
@@ -463,10 +463,22 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
463463
if metadata:
464464
update_params["metadata"] = metadata
465465

466+
# Update the child observation (Bedrock Converse)
466467
langfuse_context.update_current_observation(**update_params)
468+
469+
# Also update the parent trace (chat_completion) with final output
470+
trace_output = {
471+
"message": {
472+
"role": "assistant",
473+
"content": final_output if accumulated_output else None,
474+
},
475+
"finish_reason": finish_reason,
476+
}
477+
langfuse_context.update_current_trace(output=trace_output)
478+
467479
if DEBUG:
468480
output_length = len(accumulated_output)
469-
logger.info(f"Langfuse: Updated observation with streaming output - "
481+
logger.info(f"Langfuse: Updated observation and trace with streaming output - "
470482
f"chunks_count={output_length}, "
471483
f"output_chars={len(final_output) if accumulated_output else 0}, "
472484
f"input_tokens={final_usage.prompt_tokens if final_usage else 'N/A'}, "
@@ -482,11 +494,15 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
482494
raise
483495
except Exception as e:
484496
logger.error("Stream error for model %s: %s", chat_request.model, str(e))
485-
# Update Langfuse with error
497+
# Update Langfuse with error (both observation and trace)
486498
langfuse_context.update_current_observation(
487499
level="ERROR",
488500
status_message=f"Stream error: {str(e)}"
489501
)
502+
langfuse_context.update_current_trace(
503+
output={"error": str(e)},
504+
metadata={"error": True}
505+
)
490506
if DEBUG:
491507
logger.info(f"Langfuse: Updated observation with streaming error - error={str(e)[:100]}")
492508
error_event = Error(error=ErrorMessage(message=str(e)))

src/api/routers/chat.py

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from typing import Annotated
22

3-
from fastapi import APIRouter, Body, Depends
3+
from fastapi import APIRouter, Body, Depends, Header, Request
44
from fastapi.responses import StreamingResponse
5+
from langfuse.decorators import langfuse_context, observe
56

67
from api.auth import api_key_auth
78
from api.models.bedrock import BedrockModel
@@ -15,10 +16,52 @@
1516
)
1617

1718

19+
def extract_langfuse_metadata(chat_request: ChatRequest, headers: dict) -> dict:
20+
"""Extract Langfuse tracing metadata from request body and headers.
21+
22+
Metadata can be provided via:
23+
1. extra_body.langfuse_metadata dict in the request
24+
2. HTTP headers: X-Chat-Id, X-User-Id, X-Session-Id, X-Message-Id
25+
3. user field in the request (for user_id)
26+
27+
Returns a dict with: user_id, session_id, chat_id, message_id, and any custom metadata
28+
"""
29+
metadata = {}
30+
31+
# Extract from extra_body if present
32+
if chat_request.extra_body and isinstance(chat_request.extra_body, dict):
33+
langfuse_meta = chat_request.extra_body.get("langfuse_metadata", {})
34+
if isinstance(langfuse_meta, dict):
35+
metadata.update(langfuse_meta)
36+
37+
# Extract from headers
38+
headers_lower = {k.lower(): v for k, v in headers.items()}
39+
40+
# Map headers to metadata fields
41+
header_mapping = {
42+
"x-chat-id": "chat_id",
43+
"x-user-id": "user_id",
44+
"x-session-id": "session_id",
45+
"x-message-id": "message_id",
46+
}
47+
48+
for header_key, meta_key in header_mapping.items():
49+
if header_key in headers_lower and headers_lower[header_key]:
50+
metadata[meta_key] = headers_lower[header_key]
51+
52+
# Use the 'user' field from request as user_id if not already set
53+
if "user_id" not in metadata and chat_request.user:
54+
metadata["user_id"] = chat_request.user
55+
56+
return metadata
57+
58+
1859
@router.post(
1960
"/completions", response_model=ChatResponse | ChatStreamResponse | Error, response_model_exclude_unset=True
2061
)
62+
@observe(as_type="generation", name="chat_completion")
2163
async def chat_completions(
64+
request: Request,
2265
chat_request: Annotated[
2366
ChatRequest,
2467
Body(
@@ -34,12 +77,45 @@ async def chat_completions(
3477
),
3578
],
3679
):
80+
# Extract metadata for Langfuse tracing
81+
metadata = extract_langfuse_metadata(chat_request, dict(request.headers))
82+
83+
# Create trace name using chat_id if available
84+
trace_name = f"chat:{metadata.get('chat_id', 'unknown')}"
85+
86+
# Update trace with metadata, user_id, and session_id
87+
langfuse_context.update_current_trace(
88+
name=trace_name,
89+
user_id=metadata.get("user_id"),
90+
session_id=metadata.get("session_id"),
91+
metadata=metadata,
92+
input={
93+
"model": chat_request.model,
94+
"messages": [msg.model_dump() for msg in chat_request.messages],
95+
"temperature": chat_request.temperature,
96+
"max_tokens": chat_request.max_tokens,
97+
"tools": [tool.model_dump() for tool in chat_request.tools] if chat_request.tools else None,
98+
}
99+
)
100+
37101
if chat_request.model.lower().startswith("gpt-"):
38102
chat_request.model = DEFAULT_MODEL
39103

40104
# Exception will be raised if model not supported.
41105
model = BedrockModel()
42106
model.validate(chat_request)
107+
43108
if chat_request.stream:
44109
return StreamingResponse(content=model.chat_stream(chat_request), media_type="text/event-stream")
45-
return await model.chat(chat_request)
110+
111+
response = await model.chat(chat_request)
112+
113+
# Update trace with output for non-streaming
114+
langfuse_context.update_current_trace(
115+
output={
116+
"message": response.choices[0].message.model_dump() if response.choices else None,
117+
"finish_reason": response.choices[0].finish_reason if response.choices else None,
118+
}
119+
)
120+
121+
return response

src/api/schema.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ class ChatRequest(BaseModel):
9999
stream_options: StreamOptions | None = None
100100
temperature: float | None = Field(default=1.0, le=2.0, ge=0.0)
101101
top_p: float | None = Field(default=1.0, le=1.0, ge=0.0)
102-
user: str | None = None # Not used
102+
user: str | None = None
103103
max_tokens: int | None = 2048
104104
max_completion_tokens: int | None = None
105105
reasoning_effort: Literal["low", "medium", "high"] | None = None

0 commit comments

Comments
 (0)