feat: Langfuse Observability Integration β CLI, LangfuseSink Adapter & Docker Hosting#1271
feat: Langfuse Observability Integration β CLI, LangfuseSink Adapter & Docker Hosting#1271praisonai-triage-agent[bot] wants to merge 3 commits intomainfrom
Conversation
- Implement LangfuseSink adapter following TraceSinkProtocol - Add complete praisonai langfuse CLI command group - Support local (Docker) and remote Langfuse instances - Add langfuse>=3.0.0 as optional dependency - Maps ActionEvents to Langfuse trace/span/event API - Thread-safe with lazy imports following AGENTS.md guidelines - Zero changes to core SDK (praisonaiagents) Closes #1270 π€ Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Mervin Praison <MervinPraison@users.noreply.github.com>
|
@coderabbitai review |
|
/review |
|
@gemini review this PR |
β Actions performedReview triggered.
|
Code Review by Qodo
1. Env host ignored
|
|
Important Review skippedBot user detected. To trigger a single review, invoke the βοΈ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
π WalkthroughWalkthroughIntroduces Langfuse observability integration to PraisonAI, including a new CLI command group ( Changes
Sequence DiagramsequenceDiagram
participant Agent
participant TraceEmitter
participant LangfuseSink
participant LangfuseClient as Langfuse Client
participant LangfuseServer as Langfuse Server
Agent->>TraceEmitter: emit(AGENT_START)
TraceEmitter->>LangfuseSink: emit(ActionEvent)
LangfuseSink->>LangfuseSink: _handle_event()
LangfuseSink->>LangfuseClient: trace.create(agent-run)
LangfuseClient->>LangfuseServer: POST /traces
LangfuseClient->>LangfuseSink: return trace_id
LangfuseSink->>LangfuseClient: span.create(root_span)
LangfuseClient->>LangfuseServer: POST /spans
Agent->>TraceEmitter: emit(TOOL_START)
TraceEmitter->>LangfuseSink: emit(ActionEvent)
LangfuseSink->>LangfuseClient: span.create(tool_span)
LangfuseClient->>LangfuseServer: POST /spans
Agent->>TraceEmitter: emit(TOOL_END)
TraceEmitter->>LangfuseSink: emit(ActionEvent)
LangfuseSink->>LangfuseClient: span.update(output, status)
LangfuseClient->>LangfuseServer: PUT /spans
Agent->>TraceEmitter: emit(AGENT_END)
TraceEmitter->>LangfuseSink: emit(ActionEvent)
LangfuseSink->>LangfuseClient: span.update(output, status)
LangfuseClient->>LangfuseServer: PUT /spans
LangfuseSink->>LangfuseClient: flush()
LangfuseClient->>LangfuseServer: batch commit
Estimated Code Review Effortπ― 4 (Complex) | β±οΈ ~50 minutes Suggested Labels
Poem
π₯ Pre-merge checks | β 5β Passed checks (5 passed)
βοΈ Tip: You can configure your own custom pre-merge checks in the settings. β¨ Finishing Touchesπ§ͺ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces Langfuse observability integration, adding a new langfuse CLI command for managing server instances and a LangfuseSink for tracing agent workflows. The feedback identifies several technical improvements: the port configuration logic in the CLI is fragile and should use regular expressions, environment variable parsing needs whitespace stripping for robustness, and print statements in the observability adapter should be replaced with standard logging. Additionally, the efficiency of the span lookup mechanism in the LangfuseSink was flagged for potential performance issues during concurrent tool execution.
| compose_file = repo_path / "docker-compose.yml" | ||
| if compose_file.exists(): | ||
| content = compose_file.read_text() | ||
| content = content.replace("3000:3000", f"{port}:3000") |
There was a problem hiding this comment.
The port replacement logic using content.replace("3000:3000", ...) is fragile. It will fail on subsequent runs if the port has already been changed from the default 3000, as the string "3000:3000" will no longer exist in the file. Consider using a regular expression to match any port mapped to 3000.
| content = content.replace("3000:3000", f"{port}:3000") | |
| import re | |
| content = re.sub(r"\d+:3000", f"{port}:3000", content) |
| key, value = line.split('=', 1) | ||
| config_data[key] = value |
There was a problem hiding this comment.
When parsing the configuration file, keys and values are not stripped of whitespace. This can lead to duplicate entries or incorrect values if the .env file contains spaces around the = sign (e.g., KEY = VALUE).
| key, value = line.split('=', 1) | |
| config_data[key] = value | |
| key, value = line.split('=', 1) | |
| config_data[key.strip()] = value.strip() |
| self._handle_event(event) | ||
| except Exception as e: | ||
| # Don't let observability errors break agent execution | ||
| print(f"LangfuseSink error: {e}") |
There was a problem hiding this comment.
Using print for error reporting in a library component or adapter is generally discouraged as it pollutes the standard output of the consuming application. It is better to use the standard logging module, which allows users to configure how and where logs are handled.
| print(f"LangfuseSink error: {e}") | |
| import logging | |
| logging.getLogger(__name__).error(f"LangfuseSink error: {e}") |
| for key in self._spans: | ||
| if key.startswith(f"{agent_name}:{tool_name}:"): | ||
| tool_key = key |
There was a problem hiding this comment.
Iterating over all active spans to find a matching tool name is inefficient (O(N)) and potentially ambiguous if multiple concurrent tools with the same name are running. Consider using a more structured storage for spans, such as a dictionary of lists or stacks keyed by agent_name and tool_name to improve lookup performance and correctness.
| try: | ||
| self._client.flush() | ||
| except Exception as e: | ||
| print(f"LangfuseSink flush error: {e}") |
There was a problem hiding this comment.
| public_key: str = "" | ||
| secret_key: str = "" | ||
| host: str = "https://cloud.langfuse.com" | ||
| flush_at: int = 20 | ||
| flush_interval: float = 10.0 | ||
| enabled: bool = True | ||
|
|
||
| def __post_init__(self): | ||
| """Validate configuration.""" | ||
| if self.enabled and not (self.public_key and self.secret_key): | ||
| # Try environment variables if not explicitly set | ||
| import os | ||
| self.public_key = self.public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "") | ||
| self.secret_key = self.secret_key or os.getenv("LANGFUSE_SECRET_KEY", "") | ||
| self.host = self.host or os.getenv("LANGFUSE_HOST", os.getenv("LANGFUSE_BASE_URL", self.host)) | ||
|
|
There was a problem hiding this comment.
1. Env host ignored π Bug β‘ Correctness
LangfuseSinkConfig.__post_init__ cannot apply LANGFUSE_HOST/LANGFUSE_BASE_URL because self.host is initialized to a non-empty default and the code short-circuits before reading env vars. This silently routes traces to the wrong Langfuse instance for self-hosted deployments.
Agent Prompt
### Issue description
`LangfuseSinkConfig` currently never honors `LANGFUSE_HOST` / `LANGFUSE_BASE_URL` because `host` has a truthy default and the code uses `self.host or ...`.
### Issue Context
This breaks self-hosted Langfuse setups that expect env vars to direct the SDK to a custom base URL.
### Fix Focus Areas
- src/praisonai/praisonai/observability/langfuse.py[29-44]
### Suggested fix
- Read env vars first and only fall back to the default if env is not set, e.g.:
- `env_host = os.getenv("LANGFUSE_HOST") or os.getenv("LANGFUSE_BASE_URL")`
- `if env_host: self.host = env_host`
- Alternatively, make `host` default to `""`/`None` and then set `self.host = env_host or "https://cloud.langfuse.com"` in `__post_init__`.
β Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| enabled: bool = True | ||
|
|
||
| def __post_init__(self): | ||
| """Validate configuration.""" | ||
| if self.enabled and not (self.public_key and self.secret_key): | ||
| # Try environment variables if not explicitly set | ||
| import os | ||
| self.public_key = self.public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "") | ||
| self.secret_key = self.secret_key or os.getenv("LANGFUSE_SECRET_KEY", "") | ||
| self.host = self.host or os.getenv("LANGFUSE_HOST", os.getenv("LANGFUSE_BASE_URL", self.host)) | ||
|
|
There was a problem hiding this comment.
2. Enabled ignores missing keys π Bug β‘ Correctness
LangfuseSinkConfig leaves enabled=True even when no public/secret key is configured, so callers (including praisonai langfuse test) cannot reliably detect that Langfuse tracing is effectively unusable. This causes the test command to proceed and fail later during sink initialization instead of cleanly prompting for credentials.
Agent Prompt
### Issue description
`LangfuseSinkConfig.enabled` stays `True` even when keys are missing. Callers that use `enabled` as a readiness check (e.g., CLI `langfuse test`) will proceed and then fail later.
### Issue Context
The CLI test expects `enabled` to reflect whether credentials are present.
### Fix Focus Areas
- src/praisonai/praisonai/observability/langfuse.py[34-44]
- src/praisonai/praisonai/cli/commands/langfuse.py[389-396]
### Suggested fix
- After env lookup in `__post_init__`, if either key is still empty:
- set `self.enabled = False`, or
- raise a clear config/validation error.
- In `langfuse_test`, also guard explicitly on `config.public_key` and `config.secret_key` (defensive), so the user sees an immediate actionable message.
β Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| def _initialize_client(self) -> None: | ||
| """Lazy import and initialize Langfuse client.""" | ||
| try: | ||
| import langfuse | ||
|
|
||
| # Validate config | ||
| if not (self._config.public_key and self._config.secret_key): | ||
| raise ImportError( | ||
| "Langfuse credentials missing. Set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY " | ||
| "environment variables or pass them to LangfuseSinkConfig." | ||
| ) | ||
|
|
||
| self._client = langfuse.Langfuse( | ||
| public_key=self._config.public_key, | ||
| secret_key=self._config.secret_key, | ||
| host=self._config.host, | ||
| flush_at=self._config.flush_at, | ||
| flush_interval=self._config.flush_interval, | ||
| ) | ||
|
|
||
| except ImportError: | ||
| raise ImportError( | ||
| "Langfuse is not installed. Install with: pip install praisonai[langfuse]" | ||
| ) |
There was a problem hiding this comment.
3. Credential error masked π Bug βΌ Reliability
LangfuseSink._initialize_client raises ImportError for missing credentials, but then catches ImportError broadly and re-raises as βLangfuse is not installed,β hiding the real configuration error. This misleads users and makes troubleshooting much harder.
Agent Prompt
### Issue description
Missing credentials currently get reported as a missing dependency because `_initialize_client` catches `ImportError` broadly.
### Issue Context
This affects both programmatic usage and `praisonai langfuse test` because initialization failures become misleading.
### Fix Focus Areas
- src/praisonai/praisonai/observability/langfuse.py[74-97]
### Suggested fix
- Use two stages:
1) `try: import langfuse` / `except ImportError as e: raise ImportError("Langfuse is not installed...") from e`
2) Validate keys and raise `ValueError` (or a custom exception) for missing credentials.
- Keep the credential error message specific (public/secret key missing + which env vars to set).
β Copy this prompt and use it to remediate the issue with your preferred AI generation tools
| ports = web_container.get("Publishers", "") | ||
| if ":" in ports: | ||
| port = ports.split(":")[0] if "->" in ports else "3000" | ||
| else: | ||
| port = "3000" | ||
|
|
||
| try: | ||
| import requests | ||
| response = requests.get(f"http://127.0.0.1:{port}", timeout=5) | ||
| if response.status_code < 500: |
There was a problem hiding this comment.
4. Status port extracted wrong π Bug β‘ Correctness
langfuse_status computes the web UI port as ports.split(':')[0] when a mapping contains ->,
which returns the host segment rather than the published port. This produces invalid health-check
URLs and incorrect status output.
Agent Prompt
### Issue description
`langfuse_status` extracts the published port incorrectly from the `Publishers` string, causing broken/invalid URLs for the web health check.
### Issue Context
The code already assumes a `...:...->...` structure by checking for `"->"`, but then extracts the wrong segment.
### Fix Focus Areas
- src/praisonai/praisonai/cli/commands/langfuse.py[243-263]
### Suggested fix
- Parse the published port with a small, robust parser (regex recommended), e.g.:
- Find the substring between the last `:` before `->` and the `->`.
- Example approach: `m = re.search(r":(?P<port>\d+)->", ports)`.
- If parsing fails, fall back to `3000` and print a warning that port detection failed.
β Copy this prompt and use it to remediate the issue with your preferred AI generation tools
|
@copilot Do a thorough review of this PR. Read ALL existing reviewer comments above first. Review areas:
|
Agent-Logs-Url: https://github.com/MervinPraison/PraisonAI/sessions/178e99c1-305d-436d-8385-6e9f0642fec2 Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
Thorough review done β here's a summary of findings and fixes applied in e4afa3b: π Bugs fixed
β
Tests added βΉοΈ No backward-compat concerns β all changes are to newly added code. No existing public APIs modified. |
There was a problem hiding this comment.
Actionable comments posted: 6
π€ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/praisonai/praisonai/cli/commands/langfuse.py`:
- Around line 281-362: The config command writes ~/.praisonai/langfuse.env but
the test path (langfuse_test() / LangfuseSinkConfig()) reads os.environ only, so
running "praisonai langfuse config ..." then "praisonai langfuse test" fails;
after writing config_file in the config flow, load the saved keys into the
current process environment (e.g., set os.environ["LANGFUSE_PUBLIC_KEY"],
os.environ["LANGFUSE_SECRET_KEY"], os.environ["LANGFUSE_HOST"] from config_data)
so LangfuseSinkConfig() sees them immediately, and keep the existing file
write/sourcing guidance for new shells.
In `@src/praisonai/praisonai/observability/langfuse.py`:
- Around line 66-67: The current trace/span maps (_traces and _spans) use
agent_name and tool display names which can collide across overlapping runs;
change the keying to a stable per-run + per-invocation key (e.g., combine
agent_id (fallback to agent_name if missing) with a generated invocation_id/uuid
for each tool call) and store that composite key when creating traces/spans;
update all places that create/lookup/close traces/spans (references to _traces,
_spans and the code that currently searches for keys like
"{agent_name}:{tool_name}:*") to use the composite key (pass/return
invocation_id from the tool-start function so tool-completion can use it) and
remove any heuristics that pick the "last" matching key. Ensure tool completion
and agent-run lifecycle handlers accept or propagate the invocation_id so spans
are closed against the exact trace/span entry.
- Around line 255-270: The shutdown in close() sets self._closed before calling
flush(), which prevents flush() from sending the final batch and then ends spans
in _spans afterwards; fix by changing the order so spans are ended and flushed
while the sink is still open (e.g., call self.flush() and end/clear
spans/_traces first, then set self._closed = True), or alternatively ensure
flush() ignores the _closed flag during this shutdown path; update the close()
method references (close(), _closed, flush(), _spans, span.end(), _traces)
accordingly.
- Around line 29-43: The host env vars are only read inside the "missing
credentials" branch so custom LANGFUSE_HOST/LANGFUSE_BASE_URL never override the
default; move or add the host environment lookup out of the credentials check in
__post_init__ (or always apply it before the credential check) so self.host =
os.getenv("LANGFUSE_HOST", os.getenv("LANGFUSE_BASE_URL", self.host)) runs
regardless of whether public_key/secret_key are set, while preserving any
explicitly provided self.host if desired.
- Around line 135-156: The code currently calls removed Langfuse v2 APIs
(self._client.trace(), trace.span(), trace.event(), parent_span.span()) so
update the adapter to use the v3+ observation API: replace
self._client.trace(...) with creating an observation via
self._client.start_observation(...) or using
self._client.start_as_current_observation(as_type="generation") as a context
manager for the trace-level object; replace trace.span(...) and
parent_span.span(...) with start_as_current_observation(as_type="span") (or
start_observation()) to create span observations, call observation.update(...)
to change metadata, and call observation.end() (or let the context manager exit)
to finish them; similarly replace trace.event(...) usages with
start_observation/start_as_current_observation for events and use
.update()/.end() for lifecycle and metadata updates. Ensure you preserve
metadata keys (agent_id, agent_name, schema_version, event.metadata) when
building the observation, and update references to self._traces[agent_name] to
store the observation object rather than a v2 trace.
πͺ Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
βΉοΈ Review info
βοΈ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9ec545e4-05b3-47bb-bf9e-4def65c82c37
π Files selected for processing (5)
src/praisonai/praisonai/cli/app.pysrc/praisonai/praisonai/cli/commands/langfuse.pysrc/praisonai/praisonai/observability/__init__.pysrc/praisonai/praisonai/observability/langfuse.pysrc/praisonai/pyproject.toml
| result = subprocess.run([ | ||
| "docker", "compose", "ps", "--format", "json" | ||
| ], cwd=repo_path, capture_output=True, text=True, check=True) | ||
|
|
||
| if not result.stdout.strip(): | ||
| console.print("[yellow]π No Langfuse containers running[/yellow]") | ||
| return | ||
|
|
||
| # Parse JSON output | ||
| import json | ||
| containers = [] | ||
| for line in result.stdout.strip().split('\n'): | ||
| if line: | ||
| containers.append(json.loads(line)) |
There was a problem hiding this comment.
π§© Analysis chain
π Script executed:
cat -n src/praisonai/praisonai/cli/commands/langfuse.py | sed -n '200,260p'Repository: MervinPraison/PraisonAI
Length of output: 2604
π Web query:
docker compose ps --format json output structure documentation
π‘ Result:
The docker compose ps --format json command outputs a newline-delimited JSON (NDJSON) stream, where each line is a complete JSON object representing one container. It is not a single JSON array. This behavior aligns with docker ps --format json and has been intentional since Docker Compose v2.21+ (via alignment with docker/cli formatter). Example output (two containers): {"ID":"abc123...","Name":"project-service-1","Command":"...","CreatedAt":"2024-10-01T12:00:00Z","ExitCode":0,"Health":"","Image":"nginx","Labels":"...","LocalVolumes":"0","Mounts":"","Name":"project-service-1","Names":["project-service-1"],"Networks":"project_default","Ports":"80/tcp","Project":"project","Publishers":[{"URL":"0.0.0.0","TargetPort":80,"PublishedPort":8080,"Protocol":"tcp"}],"RunningFor":"5 minutes ago","Service":"service","Size":"0B","State":"running","Status":"Up 5 minutes"} {"ID":"def456...","Name":"project-db-1",...} Key fields in each JSON object (synthesized from official examples in docs and real outputs; exact set may vary by version/container state): - ID: Container ID (string) - Name: Container name (string) - Names: Array of names (usually single element) - Command: Quoted command (string) - CreatedAt: Creation timestamp (string, e.g., "2024-10-01 12:00:00 +0000 UTC") - Project: Compose project name (string, restored post-v2.21) - Service: Compose service name (string) - State: State like "running", "exited" (string) - Status: Detailed status like "Up 5 minutes (healthy)" (string) - Health: Health status (string, empty if none) - ExitCode: Exit code if exited (int) - Image: Image name (string) - Ports: Exposed ports summary (string) - Publishers: Array of port bindings [{URL: string (e.g. "0.0.0.0"), TargetPort: int, PublishedPort: int, Protocol: string}] - Size: Size info (string, e.g. "0B") - RunningFor: Uptime (string) - Networks: Networks summary (string) - Mounts: Mounts summary (string) - LocalVolumes: Local volume count (string) - Labels: All labels as string (string) Additional fields from docker ps alignment: Labels (map serialized), LocalVolumes, Mounts, Networks, SizeRw/SizeRootFs (sometimes). To parse as array: docker compose ps --format json | jq -s . Official documentation shows outdated array examples but confirms JSON format usage. Source code uses docker/cli's ContainerWrite with NewContainerFormat("json"), producing NDJSON of container formatter fields. No formal JSON Schema exists in docs; structure derives from Go template fields for container ps (see docker ps --format docs for base fields + Compose extras like Project/Service).
Citations:
- 1: https://docs.docker.com/reference/cli/docker/compose/ps/
- 2: https://docs.docker.com/reference/cli/docker/compose/ps
- 3: https://docs.docker.com/reference/cli/docker/container/ps/
- 4: https://manpages.debian.org/testing/docker-compose/docker-compose-ps.1.en.html
- 5: https://manpages.debian.org/unstable/docker-compose/docker-compose-ps.1.en.html
- 6: [BUG] docker-compose ps --format jsonΒ docker/compose#11784
- 7: https://stackoverflow.com/questions/77142618/incorrect-json-format-from-docker-compose-ps
- 8: https://github.com/docker/compose/blob/v2/cmd/compose/ps.go
- 9: https://pkg.go.dev/github.com/docker/DoCkEr/api/types/container
π Script executed:
cat -n src/praisonai/praisonai/cli/commands/langfuse.py | sed -n '240,265p'Repository: MervinPraison/PraisonAI
Length of output: 1103
The Publishers field is an array of objects, not a string.
Docker Compose ps --format json outputs newline-delimited JSON where each Publishers field is an array of objects (e.g., [{"URL":"0.0.0.0","TargetPort":80,"PublishedPort":8080,"Protocol":"tcp"}]), not a colon-delimited string. The code at line 260 will fail with a TypeError when attempting if ":" in ports: on a list, and line 261 will fail attempting .split() on a list. Additionally, line 241 displays the raw Publishers array in the table without formatting.
Also applies to: 249-257
π§° Tools
πͺ Ruff (0.15.7)
[error] 202-204: Starting a process with a partial executable path
(S607)
| config_dir = Path.home() / ".praisonai" | ||
| config_file = config_dir / "langfuse.env" | ||
|
|
||
| if show: | ||
| # Show current configuration | ||
| if config_file.exists(): | ||
| console.print(f"[blue]π Configuration file: {config_file}[/blue]") | ||
| content = config_file.read_text() | ||
|
|
||
| table = Table(title="Langfuse Configuration") | ||
| table.add_column("Setting", style="cyan") | ||
| table.add_column("Value", style="green") | ||
|
|
||
| for line in content.strip().split('\n'): | ||
| if '=' in line and not line.startswith('#'): | ||
| key, value = line.split('=', 1) | ||
| # Mask secret key for security | ||
| if "SECRET" in key: | ||
| value = value[:8] + "..." if len(value) > 8 else "***" | ||
| table.add_row(key, value) | ||
|
|
||
| console.print(table) | ||
|
|
||
| # Also show env vars | ||
| env_vars = [] | ||
| for var in ["LANGFUSE_PUBLIC_KEY", "LANGFUSE_SECRET_KEY", "LANGFUSE_HOST", "LANGFUSE_BASE_URL"]: | ||
| value = os.environ.get(var) | ||
| if value: | ||
| if "SECRET" in var: | ||
| value = value[:8] + "..." if len(value) > 8 else "***" | ||
| env_vars.append((var, value)) | ||
|
|
||
| if env_vars: | ||
| console.print("\n[blue]π Environment Variables:[/blue]") | ||
| for var, value in env_vars: | ||
| console.print(f" {var}={value}") | ||
| else: | ||
| console.print("[yellow]β οΈ No configuration file found[/yellow]") | ||
|
|
||
| return | ||
|
|
||
| # Set configuration | ||
| if not any([public_key, secret_key, host]): | ||
| console.print("[red]β At least one option must be provided[/red]") | ||
| console.print("Use --show to view current configuration") | ||
| raise typer.Abort() | ||
|
|
||
| # Read existing config | ||
| config_data = {} | ||
| if config_file.exists(): | ||
| content = config_file.read_text() | ||
| for line in content.strip().split('\n'): | ||
| if '=' in line and not line.startswith('#'): | ||
| key, value = line.split('=', 1) | ||
| config_data[key] = value | ||
|
|
||
| # Update with new values | ||
| if public_key: | ||
| config_data["LANGFUSE_PUBLIC_KEY"] = public_key | ||
| if secret_key: | ||
| config_data["LANGFUSE_SECRET_KEY"] = secret_key | ||
| if host: | ||
| config_data["LANGFUSE_HOST"] = host | ||
|
|
||
| # Write config | ||
| config_dir.mkdir(exist_ok=True) | ||
|
|
||
| content_lines = [ | ||
| "# Langfuse Configuration", | ||
| "# Generated by PraisonAI CLI", | ||
| "" | ||
| ] | ||
| for key, value in config_data.items(): | ||
| content_lines.append(f"{key}={value}") | ||
|
|
||
| config_file.write_text('\n'.join(content_lines) + '\n') | ||
|
|
||
| console.print(f"[green]β Configuration saved to {config_file}[/green]") | ||
| console.print("\n[blue]π‘ To use this configuration, either:[/blue]") | ||
| console.print(f" 1. Source it: source {config_file}") | ||
| console.print(" 2. Export variables manually") | ||
| console.print(" 3. Set them in your shell profile") |
There was a problem hiding this comment.
config does not actually feed test in a fresh shell.
This writes ~/.praisonai/langfuse.env, but langfuse_test() just instantiates LangfuseSinkConfig(), and that config only reads os.environ. So the documented flow of praisonai langfuse config ... followed by praisonai langfuse test still fails unless the user manually sources the file first.
Also applies to: 389-390
π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/cli/commands/langfuse.py` around lines 281 - 362, The
config command writes ~/.praisonai/langfuse.env but the test path
(langfuse_test() / LangfuseSinkConfig()) reads os.environ only, so running
"praisonai langfuse config ..." then "praisonai langfuse test" fails; after
writing config_file in the config flow, load the saved keys into the current
process environment (e.g., set os.environ["LANGFUSE_PUBLIC_KEY"],
os.environ["LANGFUSE_SECRET_KEY"], os.environ["LANGFUSE_HOST"] from config_data)
so LangfuseSinkConfig() sees them immediately, and keep the existing file
write/sourcing guidance for new shells.
| public_key: str = "" | ||
| secret_key: str = "" | ||
| host: str = "https://cloud.langfuse.com" | ||
| flush_at: int = 20 | ||
| flush_interval: float = 10.0 | ||
| enabled: bool = True | ||
|
|
||
| def __post_init__(self): | ||
| """Validate configuration.""" | ||
| if self.enabled and not (self.public_key and self.secret_key): | ||
| # Try environment variables if not explicitly set | ||
| import os | ||
| self.public_key = self.public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "") | ||
| self.secret_key = self.secret_key or os.getenv("LANGFUSE_SECRET_KEY", "") | ||
| self.host = self.host or os.getenv("LANGFUSE_HOST", os.getenv("LANGFUSE_BASE_URL", self.host)) |
There was a problem hiding this comment.
Environment-based host overrides never take effect here.
host starts with a non-empty default, and the env lookup only runs inside the "missing credentials" branch, so non-default hosts are ignored in normal configurations. This silently forces the default cloud host even when LANGFUSE_HOST / LANGFUSE_BASE_URL are set.
π One way to fix it
`@dataclass`
class LangfuseSinkConfig:
@@
- host: str = "https://cloud.langfuse.com"
+ host: Optional[str] = None
@@
def __post_init__(self):
"""Validate configuration."""
- if self.enabled and not (self.public_key and self.secret_key):
- # Try environment variables if not explicitly set
- import os
- self.public_key = self.public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "")
- self.secret_key = self.secret_key or os.getenv("LANGFUSE_SECRET_KEY", "")
- self.host = self.host or os.getenv("LANGFUSE_HOST", os.getenv("LANGFUSE_BASE_URL", self.host))
+ if not self.enabled:
+ return
+ import os
+ self.public_key = self.public_key or os.getenv("LANGFUSE_PUBLIC_KEY", "")
+ self.secret_key = self.secret_key or os.getenv("LANGFUSE_SECRET_KEY", "")
+ self.host = (
+ self.host
+ or os.getenv("LANGFUSE_HOST")
+ or os.getenv("LANGFUSE_BASE_URL")
+ or "https://cloud.langfuse.com"
+ )π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/observability/langfuse.py` around lines 29 - 43, The
host env vars are only read inside the "missing credentials" branch so custom
LANGFUSE_HOST/LANGFUSE_BASE_URL never override the default; move or add the host
environment lookup out of the credentials check in __post_init__ (or always
apply it before the credential check) so self.host = os.getenv("LANGFUSE_HOST",
os.getenv("LANGFUSE_BASE_URL", self.host)) runs regardless of whether
public_key/secret_key are set, while preserving any explicitly provided
self.host if desired.
| self._traces: Dict[str, Any] = {} # agent_name -> langfuse trace | ||
| self._spans: Dict[str, Any] = {} # agent_name -> langfuse span |
There was a problem hiding this comment.
The correlation keys are not unique enough for overlapping runs.
_traces / _spans are keyed by agent_name, and tool completion picks the last "{agent_name}:{tool_name}:*" key it finds. Two same-name agent runs, or two same-name tool invocations finishing out of order, will overwrite or mis-close each other. This needs a stable per-run key (agent_id if present) plus a per-tool invocation id instead of display names.
Also applies to: 114-127, 190-203
π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/observability/langfuse.py` around lines 66 - 67, The
current trace/span maps (_traces and _spans) use agent_name and tool display
names which can collide across overlapping runs; change the keying to a stable
per-run + per-invocation key (e.g., combine agent_id (fallback to agent_name if
missing) with a generated invocation_id/uuid for each tool call) and store that
composite key when creating traces/spans; update all places that
create/lookup/close traces/spans (references to _traces, _spans and the code
that currently searches for keys like "{agent_name}:{tool_name}:*") to use the
composite key (pass/return invocation_id from the tool-start function so
tool-completion can use it) and remove any heuristics that pick the "last"
matching key. Ensure tool completion and agent-run lifecycle handlers accept or
propagate the invocation_id so spans are closed against the exact trace/span
entry.
| trace = self._client.trace( | ||
| name=trace_name, | ||
| input=trace_input, | ||
| timestamp=event.timestamp, | ||
| metadata={ | ||
| "agent_id": event.agent_id, | ||
| "agent_name": agent_name, | ||
| "schema_version": event.schema_version, | ||
| **(event.metadata if event.metadata else {}), | ||
| } | ||
| ) | ||
| self._traces[agent_name] = trace | ||
|
|
||
| # Create root span for the agent | ||
| span = trace.span( | ||
| name=agent_name, | ||
| start_time=event.timestamp, | ||
| metadata={ | ||
| "agent_id": event.agent_id, | ||
| **(event.metadata if event.metadata else {}), | ||
| } | ||
| ) |
There was a problem hiding this comment.
π§© Analysis chain
π Script executed:
fd -t f "langfuse.py" src/praisonai/Repository: MervinPraison/PraisonAI
Length of output: 165
π Script executed:
wc -l src/praisonai/praisonai/observability/langfuse.pyRepository: MervinPraison/PraisonAI
Length of output: 120
π Script executed:
sed -n '135,156p;180,188p;224,244p' src/praisonai/praisonai/observability/langfuse.pyRepository: MervinPraison/PraisonAI
Length of output: 1987
π Script executed:
cat -n src/praisonai/praisonai/observability/langfuse.pyRepository: MervinPraison/PraisonAI
Length of output: 12505
π Web query:
Langfuse 3.0.0 Python SDK API trace span observation documentation
π‘ Result:
The Langfuse Python SDK v3 (generally available since June 2025, latest releases v3.14.x as of 2026) is built on OpenTelemetry for tracing LLM applications. Traces are implicitly created by the first root span or generation. Observations include spans (generic operations) and generations (LLM calls), plus specialized types like agent, tool, chain, etc. ## Setup pip install langfuse Set env vars: LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, optionally LANGFUSE_BASE_URL. from langfuse import get_client langfuse = get_client # singleton client langfuse.flush # for short-lived apps ## Core API for Traces, Spans, Observations ### 1. Context Manager (recommended, auto end/flush) with langfuse.start_as_current_observation(name="my-span", as_type="span") as span: span.update(input={"query": "hi"}, output="result") # optional updates with langfuse.start_as_current_observation( name="llm-call", as_type="generation", model="gpt-4o", input={"messages": [...]}, model_parameters={"temperature": 0.7} ) as generation: generation.update(output="response", usage={"input": 10, "output": 50}) Nested spans auto-link via OTEL context propagation. name is required. ### 2. Decorator from langfuse import observe @observe(name="my-function", as_type="span") def my_func: ... ### 3. Manual (requires explicit .end) span = langfuse.start_span(name="manual-span") # or start_generation span.update(input=..., output=...) span.end ### Trace-level - Use propagate_attributes(user_id="123", session_id="abc", trace_name="my-trace") context manager for trace attributes. - langfuse.get_current_trace_id, langfuse.get_current_observation_id - langfuse.create_trace_id(seed="deterministic") for W3C-compatible IDs. ### Updates on active obs - langfuse.update_current_span(input=...) - langfuse.update_current_generation(output=...) ### Scores span.score(name="quality", value=0.95) # or generation.score ## Official Sources - Primary docs: https://langfuse.com/docs/observability/sdk (overview, instrumentation) - Full API reference: https://python.reference.langfuse.com (Langfuse class, LangfuseSpan, etc.) - v3 Announcement/Key Changes: https://langfuse.com/changelog/2025-06-05-python-sdk-v3-generally-available - GitHub Releases: https://github.com/langfuse/langfuse-python/releases (v3.14.5 latest) - Observation Types: https://langfuse.com/docs/observability/features/observation-types
Citations:
- 1: https://langfuse.com/changelog/2025-06-05-python-sdk-v3-generally-available
- 2: https://langfuse.com/docs/observability/sdk/overview
- 3: https://langfuse.com/docs/sdk/python/sdk-v3
- 4: https://python.reference.langfuse.com/langfuse
- 5: https://python.reference.langfuse.com/langfuse.html
- 6: https://langfuse.com/docs/observability/sdk/python/example
- 7: https://langfuse.com/docs/observability/features/observation-types
- 8: http://langfuse.com/docs/observability/sdk/advanced-features
- 9: https://langfuse.com/docs/observability/sdk/python/setup
- 10: https://langfuse.com/docs/sdk/python/example
π Web query:
Langfuse Python SDK v2 API trace span event methods
π‘ Result:
In Langfuse Python SDK v2 (released December 17, 2023), the low-level API for creating traces, spans, and events uses the Langfuse client class. Initialize with from langfuse import Langfuse; langfuse = Langfuse(public_key="...", secret_key="..."). Key methods (from v2 snapshot docs and PyPI readme): - Traces: trace = langfuse.trace(name="my-trace", user_id="user_123", input={"query": "Hello"}); then trace.span(...), trace.generation(...), trace.event(...); finally trace.end(output="result"). - Spans: span = langfuse.span(name="my-span", input="data"); span.end(output="result"). - Events: event = langfuse.event(name="my-event"); events are point-in-time and auto-end. Full low-level SDK docs: https://langfuse.com/docs/sdk/python/low-level-sdk (archived for v2 at python-sdk-v2.docs-snapshot.langfuse.com). v2 uses direct methods like trace, span, generation, event on the client, unlike later OpenTelemetry-based versions (v3+) which use start_as_current_observation(as_type="span"), etc. See migration guide: https://langfuse.com/docs/observability/sdk/upgrade-path/python-v2-to-v3. GitHub: https://github.com/langfuse/langfuse-python (check v2 tags/releases).
Citations:
- 1: https://python-sdk-v2.docs-snapshot.langfuse.com/
- 2: https://python-sdk-v2.docs-snapshot.langfuse.com/docs/
- 3: https://github.com/langfuse/langfuse-python
- 4: https://langfuse.com/docs/observability/sdk/upgrade-path/python-v2-to-v3
- 5: https://pypi.org/project/langfuse/2.26.2/
- 6: https://pypi.org/project/langfuse/2.22.0/
- 7: https://pypi.org/project/langfuse/2.15.0/
- 8: https://pypi.org/project/langfuse/2.2.2a0/
- 9: https://langfuse.com/docs/sdk/python/low-level-sdk
π Script executed:
find . -type f \( -name "requirements.txt" -o -name "setup.py" -o -name "setup.cfg" -o -name "pyproject.toml" \) | head -20Repository: MervinPraison/PraisonAI
Length of output: 335
π Script executed:
grep -r "langfuse" ./src/praisonai/pyproject.toml ./src/praisonai/requirements.txtRepository: MervinPraison/PraisonAI
Length of output: 236
π Web query:
Langfuse Python SDK v3 start_span start_generation API reference
π‘ Result:
In Langfuse Python SDK v3 (legacy version, docs at https://langfuse.com/docs/sdk/python/sdk-v3), the primary methods for creating spans and generations use context managers for automatic lifecycle management and OpenTelemetry context propagation: - langfuse.start_as_current_observation(name="span-name", as_type="span") or langfuse.start_as_current_span(name="span-name") (legacy) - langfuse.start_as_current_observation(name="gen-name", as_type="generation", model="gpt-4") or langfuse.start_as_current_generation(name="gen-name", model="gpt-4") (legacy) For manual control (no automatic context setting or end): - langfuse.start_observation(name="span-name") - span.start_observation(name="child-span-name") on a LangfuseSpan object Example: from langfuse import get_client langfuse = get_client with langfuse.start_as_current_observation(as_type="span", name="process-request") as span: span.update(output="Processing complete") with langfuse.start_as_current_observation(as_type="generation", name="llm-response", model="gpt-3.5-turbo") as generation: generation.update(output="LLM response") langfuse.flush Note: SDK v3 is legacy; current version is v4+ with unified start_observation API. Official API reference: https://python.reference.langfuse.com (covers latest, shows LangfuseSpan and start_as_current_observation patterns). v3 upgrade guide notes migration from start_span/start_generation to start_observation(as_type=...). No direct non-context "start_span" on client found in v3; use start_observation equivalents. For self-hosted, requires Langfuse >=3.125.0.
Citations:
- 1: https://python.reference.langfuse.com/langfuse
- 2: https://python.reference.langfuse.com/langfuse.html
- 3: https://langfuse.com/docs/sdk/python/sdk-v3
- 4: https://langfuse.com/docs/observability/sdk/instrumentation
- 5: https://langfuse.com/changelog/2025-06-05-python-sdk-v3-generally-available
- 6: https://github.com/langfuse/langfuse-python/blob/72aa1565/langfuse/_client/span.py
- 7: https://www.apiautomations.com
- 8: https://get.langfuse.com/docs/observability/sdk/upgrade-path/python-v3-to-v4
π Web query:
Langfuse Python SDK v3 client trace span event methods backward compatibility v2
π‘ Result:
The Langfuse Python SDK v3 is not backward compatible with v2 client methods for traces, spans, and events. Official migration guides explicitly state this. In v2, the low-level SDK used methods like langfuse.trace, langfuse.span, langfuse.generation, and langfuse.event to create observations, with updates via methods on the returned objects. In v3 (and later v4+), these are replaced by OpenTelemetry-based methods: - langfuse.start_as_current_observation(as_type="span"| "generation"| "event", name=...) with context manager for auto-end, or langfuse.start_observation for manual .end - Updates use .update on LangfuseSpan, LangfuseGeneration, or LangfuseEvent objects (e.g., span.update(input=..., output=...)) - Events via langfuse.create_event(name=...) returning LangfuseEvent - Trace management via propagate_attributes or observation-specific trace updates (some deprecated in v4) Key breaking changes: - Required 'name' parameter for observations - Explicit .end or context managers (no auto-end like v2) - No direct langfuse.trace/span; traces implicit from root span - Context propagation automatic via OTEL - IDs in W3C format v2 receives only critical fixes; upgrade requires code changes per integration type (@observe, low-level, integrations). See official v2βv3 guide: https://langfuse.com/docs/observability/sdk/upgrade-path/python-v2-to-v3
Citations:
- 1: https://langfuse.com/changelog/2025-06-05-python-sdk-v3-generally-available
- 2: https://langfuse.com/docs/observability/sdk/upgrade-path/python-v2-to-v3
- 3: https://python.reference.langfuse.com/langfuse
- 4: https://python.reference.langfuse.com/langfuse.html
- 5: https://github.com/langfuse/langfuse-python/blob/72aa1565/langfuse/_client/client.py
- 6: https://github.com/langfuse/langfuse-python/blob/72aa1565/langfuse/_client/span.py
Langfuse SDK v3 API incompatibility: code uses removed v2-style methods.
The code uses trace(), trace.span(), and trace.event() methods which were removed in Langfuse v3. While langfuse>=3.0.0 is declared as a dependency, v3 replaced these with start_as_current_observation(as_type="span"|"generation") (context manager) or start_observation() (manual), with automatic OTEL context propagation. This adapter will fail at runtime with AttributeError when instantiated with any Langfuse 3.x or 4.x client.
Affected locations: lines 142 (self._client.trace()), 156 (trace.span()), 187 (parent_span.span()), 231 and 247 (trace.event()).
Migrate to v3+ API using start_as_current_observation() context managers or start_observation() for manual lifecycle control, ensuring updates use .update() method on observation objects and explicit .end() calls or context manager exit.
π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/observability/langfuse.py` around lines 135 - 156,
The code currently calls removed Langfuse v2 APIs (self._client.trace(),
trace.span(), trace.event(), parent_span.span()) so update the adapter to use
the v3+ observation API: replace self._client.trace(...) with creating an
observation via self._client.start_observation(...) or using
self._client.start_as_current_observation(as_type="generation") as a context
manager for the trace-level object; replace trace.span(...) and
parent_span.span(...) with start_as_current_observation(as_type="span") (or
start_observation()) to create span observations, call observation.update(...)
to change metadata, and call observation.end() (or let the context manager exit)
to finish them; similarly replace trace.event(...) usages with
start_observation/start_as_current_observation for events and use
.update()/.end() for lifecycle and metadata updates. Ensure you preserve
metadata keys (agent_id, agent_name, schema_version, event.metadata) when
building the observation, and update references to self._traces[agent_name] to
store the observation object rather than a v2 trace.
| def close(self) -> None: | ||
| """Close the sink and release resources.""" | ||
| if not self._closed: | ||
| self._closed = True | ||
| self.flush() | ||
| if self._client: | ||
| try: | ||
| # Close any remaining spans | ||
| with self._lock: | ||
| for span in self._spans.values(): | ||
| try: | ||
| span.end() | ||
| except: | ||
| pass | ||
| self._spans.clear() | ||
| self._traces.clear() |
There was a problem hiding this comment.
close() never flushes the spans it ends.
_closed is set before flush(), so the first flush is skipped, and the remaining spans are ended only after that attempted flush. Short-lived processes that rely on close() can therefore drop the final batch.
β»οΈ Safer shutdown order
def close(self) -> None:
"""Close the sink and release resources."""
- if not self._closed:
- self._closed = True
- self.flush()
- if self._client:
- try:
- # Close any remaining spans
- with self._lock:
- for span in self._spans.values():
- try:
- span.end()
- except:
- pass
- self._spans.clear()
- self._traces.clear()
- except Exception:
- pass
+ if self._closed:
+ return
+ if self._client:
+ with self._lock:
+ for span in list(self._spans.values()):
+ try:
+ span.end()
+ except Exception:
+ pass
+ self._spans.clear()
+ self._traces.clear()
+ self.flush()
+ self._closed = Trueπ§° Tools
πͺ Ruff (0.15.7)
[error] 267-267: Do not use bare except
(E722)
[error] 267-268: try-except-pass detected, consider logging the exception
(S110)
π€ Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/praisonai/praisonai/observability/langfuse.py` around lines 255 - 270,
The shutdown in close() sets self._closed before calling flush(), which prevents
flush() from sending the final batch and then ends spans in _spans afterwards;
fix by changing the order so spans are ended and flushed while the sink is still
open (e.g., call self.flush() and end/clear spans/_traces first, then set
self._closed = True), or alternatively ensure flush() ignores the _closed flag
during this shutdown path; update the close() method references (close(),
_closed, flush(), _spans, span.end(), _traces) accordingly.
|
@claude You are the FINAL architecture reviewer. Read ALL comments above from Gemini, Qodo, CodeRabbit, and Copilot carefully before responding.\n\nPhase 1: Review per AGENTS.md\n1. Protocol-driven: check heavy implementations vs core SDK\n2. Backward compatible: ensure zero feature regressions\n3. Performance: no hot-path regressions\n\nPhase 2: FIX Valid Issues\n4. For any VALID bugs or architectural flaws found by Gemini, CodeRabbit, Qodo, Copilot, or any other reviewer: implement the fix\n5. Push all code fixes directly to THIS branch (do NOT create a new PR)\n6. Comment a summary of exact files modified and what you skipped\n\nPhase 3: Final Verdict\n7. If all issues are resolved, approve the PR / close the Issue\n8. If blocking issues remain, request changes / leave clear action items |
|
Claude finished @MervinPraison's task ββ View job Final Architecture Review βPhase 1: Architecture Review per AGENTS.md β
Phase 2: Fix Valid Issues β
Phase 3: Final Verdict β
π― FINAL VERDICT: APPROVED FOR MERGE ββ What Was Fixed (Critical Issues)
β Architecture Compliance
ποΈ What Was Skipped (Non-Critical)
All critical blockers have been resolved. Architecture follows AGENTS.md principles. No backward compatibility concerns. Ready for production use. Recommendation: β MERGE AND CLOSE ISSUE Commit: 6986f78 | Branch: claude/issue-1270-20260402-2212 | Job Run: |
- Update LangfuseSink to use Langfuse v3+ API (start_observation vs trace/span/event) - Fix CLI port parsing for Docker Compose JSON format (Publishers is array not string) - Fix config->test workflow: load ~/.praisonai/langfuse.env in test command - Improve agent/tool span collision safety with unique keys - Add proper UUID-based tool invocation tracking Co-authored-by: MervinPraison <454862+MervinPraison@users.noreply.github.com>
except:βexcept Exception:in CLI andclose()method (2 locations)import requestsandimport jsonoutside loops/repeated calls βrequests_libalias at top of function,jsonas module-level import_requestsβrequests_lib(avoids misleading private-variable convention)ImportErrorfor missing credentials βValueError(semantically correct)LangfuseSinkConfighost env-var bug: host default was"https://cloud.langfuse.com"causingor-based env-var reading to short-circuit; changed default to""and always resolve env vars in__post_init__close()ordering bug: was setting_closed = Truebefore callingflush(), soflush()always no-op'd insideclose(); fixed to flush first then mark closed