Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ def create_agent(self, config: Any, tools: Optional[ToolRegistry] = None) -> Lan
)
return LangChainAgentRunner(agent)

def create_agent_graph(self, graph_def: Any, tools: ToolRegistry) -> Any:
def create_agent_graph(
self,
graph_def: Any,
tools: ToolRegistry,
) -> Any:
"""
CAUTION:
This feature is experimental and should NOT be considered ready for production use.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""LangGraph agent graph runner for LaunchDarkly AI SDK."""

import asyncio
import time
from typing import Annotated, Any, Dict, List, Optional, Set, Tuple
from contextvars import ContextVar
from typing import Annotated, Any, Dict, List, Set, Tuple

from ldai import log
from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode
Expand All @@ -16,6 +18,9 @@
)
from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler

# Per-run eval task accumulator, isolated per concurrent run() call via ContextVar.
_run_eval_tasks: ContextVar[Dict[str, List[asyncio.Task]]] = ContextVar('_run_eval_tasks')


def _make_handoff_tool(child_key: str, description: str) -> Any:
"""
Expand Down Expand Up @@ -67,7 +72,11 @@ class LangGraphAgentGraphRunner(AgentGraphRunner):
Requires ``langgraph`` to be installed.
"""

def __init__(self, graph: AgentGraphDefinition, tools: ToolRegistry):
def __init__(
self,
graph: AgentGraphDefinition,
tools: ToolRegistry,
):
"""
Initialize the runner.

Expand Down Expand Up @@ -172,6 +181,26 @@ async def invoke(state: WorkflowState) -> dict:
if node_instructions:
msgs = [SystemMessage(content=node_instructions)] + msgs
response = await bound_model.ainvoke(msgs)

node_obj = self._graph.get_node(nk)
if node_obj is not None:
input_text = '\r\n'.join(
m.content if isinstance(m.content, str) else str(m.content)
for m in msgs
) if msgs else ''
output_text = (
response.content if hasattr(response, 'content') else str(response)
)
task = node_obj.get_config().evaluator.evaluate(input_text, output_text)
run_tasks = _run_eval_tasks.get(None)
if run_tasks is not None:
run_tasks.setdefault(nk, []).append(task)
else:
log.warning(
f"LangGraphAgentGraphRunner: eval task for node '{nk}' "
"has no run context; judge results will not be tracked"
)

return {'messages': [response]}

invoke.__name__ = nk
Expand Down Expand Up @@ -280,7 +309,9 @@ async def run(self, input: Any) -> AgentGraphResult:
:param input: The string prompt to send to the agent graph
:return: AgentGraphResult with the final output and metrics
"""
tracker = self._graph.create_tracker() if self._graph.create_tracker is not None else None
pending_eval_tasks: Dict[str, List[asyncio.Task]] = {}
token = _run_eval_tasks.set(pending_eval_tasks)
tracker = self._graph.create_tracker()
start_ns = time.perf_counter_ns()

try:
Expand All @@ -299,19 +330,18 @@ async def run(self, input: Any) -> AgentGraphResult:
output = extract_last_message_content(messages)

# Flush per-node metrics to LD trackers
handler.flush(self._graph)
all_eval_results = await handler.flush(self._graph, pending_eval_tasks)

# Graph-level metrics
if tracker:
tracker.track_path(handler.path)
tracker.track_duration(duration)
tracker.track_invocation_success()
tracker.track_total_tokens(sum_token_usage_from_messages(messages))
tracker.track_path(handler.path)
tracker.track_duration(duration)
tracker.track_invocation_success()
tracker.track_total_tokens(sum_token_usage_from_messages(messages))

return AgentGraphResult(
output=output,
raw=result,
metrics=LDAIMetrics(success=True),
evaluations=all_eval_results,
)

except Exception as exc:
Expand All @@ -323,11 +353,12 @@ async def run(self, input: Any) -> AgentGraphResult:
else:
log.warning(f'LangGraphAgentGraphRunner run failed: {exc}')
duration = (time.perf_counter_ns() - start_ns) // 1_000_000
if tracker:
tracker.track_duration(duration)
tracker.track_invocation_failure()
tracker.track_duration(duration)
tracker.track_invocation_failure()
Comment thread
cursor[bot] marked this conversation as resolved.
Comment thread
jsonbailey marked this conversation as resolved.
return AgentGraphResult(
output='',
raw=None,
metrics=LDAIMetrics(success=False),
)
finally:
_run_eval_tasks.reset(token)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import ChatGeneration, LLMResult
from ldai.agent_graph import AgentGraphDefinition
from ldai.providers.types import JudgeResult
from ldai.tracker import TokenUsage

from ldai_langchain.langchain_helper import get_ai_usage_from_response
Expand Down Expand Up @@ -188,15 +189,22 @@ def on_tool_end(
# Flush
# ------------------------------------------------------------------

def flush(self, graph: AgentGraphDefinition) -> None:
async def flush(
self, graph: AgentGraphDefinition, eval_tasks=None
) -> List[JudgeResult]:
"""
Emit all collected per-node metrics to the LaunchDarkly trackers.

Call this once after the graph run completes.

:param graph: The AgentGraphDefinition whose nodes hold the LD config trackers.
:param eval_tasks: Optional dict mapping node key to a list of awaitables that
return judge evaluation results. Multiple tasks arise when a node is visited
more than once (e.g. in a graph with cycles).
:return: All judge results collected across all nodes.
"""
node_trackers: Dict[str, Any] = {}
all_eval_results: List[JudgeResult] = []
for node_key in self._path:
if node_key in node_trackers:
continue
Expand All @@ -220,3 +228,15 @@ def flush(self, graph: AgentGraphDefinition) -> None:

for tool_key in self._node_tool_calls.get(node_key, []):
config_tracker.track_tool_call(tool_key)

if not eval_tasks:
continue

for eval_task in eval_tasks.get(node_key, []):
results = await eval_task
all_eval_results.extend(results)
for r in results:
if r.success:
config_tracker.track_judge_result(r)

return all_eval_results
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage

from ldai import LDMessage
from ldai.evaluator import Evaluator

from ldai_langchain import (
LangChainModelRunner,
Expand Down Expand Up @@ -530,6 +531,7 @@ def sync_tool(x: str = '') -> str:
cfg = AIAgentConfig(
key='n',
enabled=True,
evaluator=Evaluator.noop(),
create_tracker=MagicMock(),
model=ModelConfig(
name='gpt-4',
Expand All @@ -553,6 +555,7 @@ async def async_tool(x: str = '') -> str:
cfg = AIAgentConfig(
key='n',
enabled=True,
evaluator=Evaluator.noop(),
create_tracker=MagicMock(),
model=ModelConfig(
name='gpt-4',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from unittest.mock import AsyncMock, MagicMock, patch

from ldai.agent_graph import AgentGraphDefinition
from ldai.evaluator import Evaluator
from ldai.models import AIAgentGraphConfig, AIAgentConfig, ModelConfig, ProviderConfig
from ldai.providers import AgentGraphResult, ToolRegistry
from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner
Expand All @@ -20,6 +21,7 @@ def _make_graph(enabled: bool = True) -> AgentGraphDefinition:
model=ModelConfig(name='gpt-4'),
provider=ProviderConfig(name='openai'),
instructions='You are a helpful assistant.',
evaluator=Evaluator.noop(),
)
graph_config = AIAgentGraphConfig(
key='test-graph',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ldai.agent_graph import AgentGraphDefinition
from ldai.models import AIAgentConfig, AIAgentGraphConfig, ModelConfig, ProviderConfig
from ldai.tracker import AIGraphTracker, LDAIConfigTracker, TokenUsage
from ldai.evaluator import Evaluator
from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler


Expand Down Expand Up @@ -48,6 +49,7 @@ def _make_graph(mock_ld_client: MagicMock, node_key: str = 'root-agent', graph_k
node_config = AIAgentConfig(
key=node_key,
enabled=True,
evaluator=Evaluator.noop(),
model=ModelConfig(name='gpt-4', parameters={}),
provider=ProviderConfig(name='openai'),
instructions='Be helpful.',
Expand Down Expand Up @@ -317,7 +319,8 @@ def test_on_tool_end_none_name_ignored():
# flush() tests
# ---------------------------------------------------------------------------

def test_flush_emits_token_events_to_ld_tracker():
@pytest.mark.asyncio
async def test_flush_emits_token_events_to_ld_tracker():
"""flush() calls track_tokens on the node's config tracker."""
mock_ld_client = MagicMock()
graph = _make_graph(mock_ld_client, node_key='root-agent', graph_key='g1')
Expand All @@ -327,7 +330,7 @@ def test_flush_emits_token_events_to_ld_tracker():
node_run_id = uuid4()
handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent')
handler.on_llm_end(_llm_result(15, 10, 5), run_id=uuid4(), parent_run_id=node_run_id)
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
assert ev['$ld:ai:tokens:total'][0][1] == 15
Expand All @@ -336,7 +339,8 @@ def test_flush_emits_token_events_to_ld_tracker():
assert ev['$ld:ai:generation:success'][0][1] == 1


def test_flush_emits_duration():
@pytest.mark.asyncio
async def test_flush_emits_duration():
"""flush() calls track_duration when duration was recorded."""
mock_ld_client = MagicMock()
graph = _make_graph(mock_ld_client)
Expand All @@ -346,13 +350,14 @@ def test_flush_emits_duration():
run_id = uuid4()
handler.on_chain_start({}, {}, run_id=run_id, name='root-agent')
handler.on_chain_end({}, run_id=run_id)
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
assert '$ld:ai:duration:total' in ev


def test_flush_emits_tool_calls():
@pytest.mark.asyncio
async def test_flush_emits_tool_calls():
"""flush() calls track_tool_call for each recorded tool invocation."""
mock_ld_client = MagicMock()
graph = _make_graph(mock_ld_client)
Expand All @@ -366,15 +371,16 @@ def test_flush_emits_tool_calls():
tools_run_id = uuid4()
handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools')
handler.on_tool_end('r', run_id=uuid4(), parent_run_id=tools_run_id, name='fn_search')
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
tool_events = ev.get('$ld:ai:tool_call', [])
assert len(tool_events) == 1
assert tool_events[0][0]['toolKey'] == 'search'


def test_flush_includes_graph_key_in_node_events():
@pytest.mark.asyncio
async def test_flush_includes_graph_key_in_node_events():
"""flush() passes graph_key to the node tracker so graphKey appears in events."""
mock_ld_client = MagicMock()
graph = _make_graph(mock_ld_client, graph_key='my-graph')
Expand All @@ -384,14 +390,15 @@ def test_flush_includes_graph_key_in_node_events():
node_run_id = uuid4()
handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent')
handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id)
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
token_data = ev['$ld:ai:tokens:total'][0][0]
assert token_data.get('graphKey') == 'my-graph'


def test_flush_with_no_graph_key_on_node_tracker():
@pytest.mark.asyncio
async def test_flush_with_no_graph_key_on_node_tracker():
"""When node tracker has no graph_key, events omit graphKey."""
mock_ld_client = MagicMock()
context = MagicMock()
Expand All @@ -408,6 +415,7 @@ def test_flush_with_no_graph_key_on_node_tracker():
node_config = AIAgentConfig(
key='root-agent',
enabled=True,
evaluator=Evaluator.noop(),
model=ModelConfig(name='gpt-4', parameters={}),
provider=ProviderConfig(name='openai'),
instructions='Be helpful.',
Expand All @@ -425,36 +433,38 @@ def test_flush_with_no_graph_key_on_node_tracker():
nodes=nodes,
context=context,
enabled=True,
create_tracker=lambda: None,
create_tracker=lambda: AIGraphTracker(mock_ld_client, 'v1', 'test-graph', 1, context),
)

handler = LDMetricsCallbackHandler({'root-agent'}, {})
node_run_id = uuid4()
handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent')
handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id)
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
token_data = ev['$ld:ai:tokens:total'][0][0]
assert 'graphKey' not in token_data


def test_flush_skips_nodes_not_in_path():
@pytest.mark.asyncio
async def test_flush_skips_nodes_not_in_path():
"""flush() only emits events for nodes that were actually executed."""
mock_ld_client = MagicMock()
graph = _make_graph(mock_ld_client)
tracker = graph.create_tracker()

# Handler with 'root-agent' in node_keys but never started
handler = LDMetricsCallbackHandler({'root-agent'}, {})
handler.flush(graph)
await handler.flush(graph)

ev = _events(mock_ld_client)
assert '$ld:ai:tokens:total' not in ev
assert '$ld:ai:generation:success' not in ev


def test_flush_skips_node_without_tracker():
@pytest.mark.asyncio
async def test_flush_skips_node_without_tracker():
"""flush() silently skips nodes whose config has no tracker."""
mock_ld_client = MagicMock()
context = MagicMock()
Expand All @@ -463,6 +473,7 @@ def test_flush_skips_node_without_tracker():
key='no-track',
enabled=True,
create_tracker=lambda: None,
evaluator=Evaluator.noop(),
model=ModelConfig(name='gpt-4', parameters={}),
provider=ProviderConfig(name='openai'),
instructions='',
Expand All @@ -483,7 +494,7 @@ def test_flush_skips_node_without_tracker():
node_run_id = uuid4()
handler.on_chain_start({}, {}, run_id=node_run_id, name='no-track')
handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id)
handler.flush(graph) # should not raise
await handler.flush(graph) # should not raise

mock_ld_client.track.assert_not_called()

Expand Down
Loading
Loading