1414import tiktoken
1515from botocore .config import Config
1616from fastapi import HTTPException
17- from langfuse . decorators import observe , langfuse_context
17+ from langfuse import observe , get_client
1818from starlette .concurrency import run_in_threadpool
1919
2020from api .models .base import BaseChatModel , BaseEmbeddingsModel
5454
5555logger = logging .getLogger (__name__ )
5656
57+ # Initialize Langfuse client
58+ _langfuse_client = None
59+
60+ def _get_langfuse_client ():
61+ """Get or create the Langfuse client singleton."""
62+ global _langfuse_client
63+ if _langfuse_client is None :
64+ try :
65+ _langfuse_client = get_client ()
66+ except Exception as e :
67+ logger .warning (f"Failed to initialize Langfuse client: { e } " )
68+ _langfuse_client = None
69+ return _langfuse_client
70+
5771config = Config (
5872 connect_timeout = 60 , # Connection timeout: 60 seconds
5973 read_timeout = 900 , # Read timeout: 15 minutes (suitable for long streaming responses)
@@ -253,18 +267,23 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
253267 }
254268
255269 # Update Langfuse generation with input metadata
256- langfuse_context .update_current_observation (
257- input = messages ,
258- model = model_id ,
259- model_parameters = model_parameters ,
260- metadata = {
261- 'system' : args_clone .get ('system' , []),
262- 'toolConfig' : args_clone .get ('toolConfig' , {}),
263- 'stream' : stream
264- }
265- )
266- if DEBUG :
267- logger .info (f"Langfuse: Updated observation with input - model={ model_id } , stream={ stream } , messages_count={ len (messages )} " )
270+ langfuse_client = _get_langfuse_client ()
271+ if langfuse_client :
272+ try :
273+ langfuse_client .update_current_generation (
274+ input = messages ,
275+ model = model_id ,
276+ model_parameters = model_parameters ,
277+ metadata = {
278+ 'system' : args_clone .get ('system' , []),
279+ 'toolConfig' : args_clone .get ('toolConfig' , {}),
280+ 'stream' : stream
281+ }
282+ )
283+ if DEBUG :
284+ logger .info (f"Langfuse: Updated observation with input - model={ model_id } , stream={ stream } , messages_count={ len (messages )} " )
285+ except Exception as e :
286+ logger .warning (f"Failed to update Langfuse: { e } " )
268287
269288 try :
270289 if stream :
@@ -302,41 +321,61 @@ async def _invoke_bedrock(self, chat_request: ChatRequest, stream=False):
302321 metadata ["reasoning_content" ] = reasoning_text
303322 metadata ["reasoning_tokens_estimate" ] = len (reasoning_text ) // 4
304323
305- langfuse_context .update_current_observation (
306- output = output_message ,
307- usage = {
308- "input" : usage .get ("inputTokens" , 0 ),
309- "output" : usage .get ("outputTokens" , 0 ),
310- "total" : usage .get ("totalTokens" , 0 )
311- },
312- metadata = metadata
313- )
314- if DEBUG :
315- logger .info (f"Langfuse: Updated observation with output - "
316- f"input_tokens={ usage .get ('inputTokens' , 0 )} , "
317- f"output_tokens={ usage .get ('outputTokens' , 0 )} , "
318- f"has_reasoning={ has_reasoning } , "
319- f"stop_reason={ response .get ('stopReason' )} " )
324+ langfuse_client = _get_langfuse_client ()
325+ if langfuse_client :
326+ try :
327+ langfuse_client .update_current_generation (
328+ output = output_message ,
329+ usage = {
330+ "input" : usage .get ("inputTokens" , 0 ),
331+ "output" : usage .get ("outputTokens" , 0 ),
332+ "total" : usage .get ("totalTokens" , 0 )
333+ },
334+ metadata = metadata
335+ )
336+ if DEBUG :
337+ logger .info (f"Langfuse: Updated observation with output - "
338+ f"input_tokens={ usage .get ('inputTokens' , 0 )} , "
339+ f"output_tokens={ usage .get ('outputTokens' , 0 )} , "
340+ f"has_reasoning={ has_reasoning } , "
341+ f"stop_reason={ response .get ('stopReason' )} " )
342+ except Exception as e :
343+ logger .warning (f"Failed to update Langfuse: { e } " )
320344 except bedrock_runtime .exceptions .ValidationException as e :
321345 error_message = f"Bedrock validation error for model { chat_request .model } : { str (e )} "
322346 logger .error (error_message )
323- langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
324- if DEBUG :
325- logger .info (f"Langfuse: Updated observation with ValidationException error" )
347+ langfuse_client = _get_langfuse_client ()
348+ if langfuse_client :
349+ try :
350+ langfuse_client .update_current_generation (level = "ERROR" , status_message = error_message )
351+ if DEBUG :
352+ logger .info (f"Langfuse: Updated observation with ValidationException error" )
353+ except Exception :
354+ pass
326355 raise HTTPException (status_code = 400 , detail = str (e ))
327356 except bedrock_runtime .exceptions .ThrottlingException as e :
328357 error_message = f"Bedrock throttling for model { chat_request .model } : { str (e )} "
329358 logger .warning (error_message )
330- langfuse_context .update_current_observation (level = "WARNING" , status_message = error_message )
331- if DEBUG :
332- logger .info (f"Langfuse: Updated observation with ThrottlingException warning" )
359+ langfuse_client = _get_langfuse_client ()
360+ if langfuse_client :
361+ try :
362+ langfuse_client .update_current_generation (level = "WARNING" , status_message = error_message )
363+ if DEBUG :
364+ logger .info (f"Langfuse: Updated observation with ThrottlingException warning" )
365+ except Exception :
366+ pass
333367 raise HTTPException (status_code = 429 , detail = str (e ))
334368 except Exception as e :
335369 error_message = f"Bedrock invocation failed for model { chat_request .model } : { str (e )} "
336370 logger .error (error_message )
337- langfuse_context .update_current_observation (level = "ERROR" , status_message = error_message )
338- if DEBUG :
339- logger .info (f"Langfuse: Updated observation with generic Exception error" )
371+ langfuse_client = _get_langfuse_client ()
372+ if langfuse_client :
373+ try :
374+ langfuse_client .update_current_generation (level = "ERROR" , status_message = error_message )
375+ if DEBUG :
376+ logger .info (f"Langfuse: Updated observation with generic Exception error" )
377+ except Exception :
378+ pass
340379 raise HTTPException (status_code = 500 , detail = str (e ))
341380 return response
342381
@@ -447,17 +486,21 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
447486 if metadata :
448487 update_params ["metadata" ] = metadata
449488
450- langfuse_context .update_current_observation (** update_params )
451-
452- if DEBUG :
453- output_length = len (accumulated_output )
454- logger .info (f"Langfuse: Updated observation with streaming output - "
455- f"chunks_count={ output_length } , "
456- f"output_chars={ len (final_output ) if accumulated_output else 0 } , "
457- f"input_tokens={ final_usage .prompt_tokens if final_usage else 'N/A' } , "
458- f"output_tokens={ final_usage .completion_tokens if final_usage else 'N/A' } , "
459- f"has_reasoning={ has_reasoning } , "
460- f"finish_reason={ finish_reason } " )
489+ langfuse_client = _get_langfuse_client ()
490+ if langfuse_client :
491+ try :
492+ langfuse_client .update_current_generation (** update_params )
493+ if DEBUG :
494+ output_length = len (accumulated_output )
495+ logger .info (f"Langfuse: Updated observation with streaming output - "
496+ f"chunks_count={ output_length } , "
497+ f"output_chars={ len (final_output ) if accumulated_output else 0 } , "
498+ f"input_tokens={ final_usage .prompt_tokens if final_usage else 'N/A' } , "
499+ f"output_tokens={ final_usage .completion_tokens if final_usage else 'N/A' } , "
500+ f"has_reasoning={ has_reasoning } , "
501+ f"finish_reason={ finish_reason } " )
502+ except Exception as e :
503+ logger .warning (f"Failed to update Langfuse: { e } " )
461504
462505 # return an [DONE] message at the end.
463506 yield self .stream_response_to_bytes ()
@@ -468,12 +511,17 @@ async def chat_stream(self, chat_request: ChatRequest) -> AsyncIterable[bytes]:
468511 except Exception as e :
469512 logger .error ("Stream error for model %s: %s" , chat_request .model , str (e ))
470513 # Update Langfuse with error
471- langfuse_context .update_current_observation (
472- level = "ERROR" ,
473- status_message = f"Stream error: { str (e )} "
474- )
475- if DEBUG :
476- logger .info (f"Langfuse: Updated observation with streaming error - error={ str (e )[:100 ]} " )
514+ langfuse_client = _get_langfuse_client ()
515+ if langfuse_client :
516+ try :
517+ langfuse_client .update_current_generation (
518+ level = "ERROR" ,
519+ status_message = f"Stream error: { str (e )} "
520+ )
521+ if DEBUG :
522+ logger .info (f"Langfuse: Updated observation with streaming error - error={ str (e )[:100 ]} " )
523+ except Exception :
524+ pass
477525 error_event = Error (error = ErrorMessage (message = str (e )))
478526 yield self .stream_response_to_bytes (error_event )
479527
0 commit comments