diff --git a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py index 15eee41f..1468c57b 100644 --- a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_agent_graph_runner.py @@ -1,14 +1,12 @@ """LangGraph agent graph runner for LaunchDarkly AI SDK.""" -import asyncio import time -from contextvars import ContextVar from typing import Annotated, Any, Dict, List, Set, Tuple from ldai import log from ldai.agent_graph import AgentGraphDefinition, AgentGraphNode -from ldai.providers import AgentGraphResult, AgentGraphRunner, ToolRegistry -from ldai.providers.types import LDAIMetrics +from ldai.providers import AgentGraphRunner, ToolRegistry +from ldai.providers.types import AgentGraphRunnerResult, GraphMetrics from ldai_langchain.langchain_helper import ( build_structured_tools, @@ -18,9 +16,6 @@ ) from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler -# Per-run eval task accumulator, isolated per concurrent run() call via ContextVar. -_run_eval_tasks: ContextVar[Dict[str, List[asyncio.Task]]] = ContextVar('_run_eval_tasks') - def _make_handoff_tool(child_key: str, description: str) -> Any: """ @@ -65,9 +60,10 @@ class LangGraphAgentGraphRunner(AgentGraphRunner): AgentGraphRunner implementation for LangGraph. - Compiles and runs the agent graph with LangGraph and automatically records - graph- and node-level AI metric data to the LaunchDarkly trackers on the - graph definition and each node. + Compiles and runs the agent graph with LangGraph and collects graph- and + node-level metrics via a LangChain callback handler. Tracking events are + emitted by the managed layer (:class:`~ldai.ManagedAgentGraph`) from the + returned :class:`~ldai.providers.types.AgentGraphRunnerResult`. Requires ``langgraph`` to be installed. """ @@ -181,26 +177,6 @@ async def invoke(state: WorkflowState) -> dict: if node_instructions: msgs = [SystemMessage(content=node_instructions)] + msgs response = await bound_model.ainvoke(msgs) - - node_obj = self._graph.get_node(nk) - if node_obj is not None: - input_text = '\r\n'.join( - m.content if isinstance(m.content, str) else str(m.content) - for m in msgs - ) if msgs else '' - output_text = ( - response.content if hasattr(response, 'content') else str(response) - ) - task = node_obj.get_config().evaluator.evaluate(input_text, output_text) - run_tasks = _run_eval_tasks.get(None) - if run_tasks is not None: - run_tasks.setdefault(nk, []).append(task) - else: - log.warning( - f"LangGraphAgentGraphRunner: eval task for node '{nk}' " - "has no run context; judge results will not be tracked" - ) - return {'messages': [response]} invoke.__name__ = nk @@ -298,20 +274,18 @@ def route(state: WorkflowState) -> str: compiled = agent_builder.compile() return compiled, fn_name_to_config_key, node_keys - async def run(self, input: Any) -> AgentGraphResult: + async def run(self, input: Any) -> AgentGraphRunnerResult: """ Run the agent graph with the given input. Builds a LangGraph StateGraph from the AgentGraphDefinition, compiles it, and invokes it. Uses a LangChain callback handler to collect - per-node metrics, then flushes them to LaunchDarkly trackers. + per-node metrics. Graph-level tracking events are emitted by the + managed layer from the returned GraphMetrics. :param input: The string prompt to send to the agent graph - :return: AgentGraphResult with the final output and metrics + :return: AgentGraphRunnerResult with the final content and GraphMetrics """ - pending_eval_tasks: Dict[str, List[asyncio.Task]] = {} - token = _run_eval_tasks.set(pending_eval_tasks) - tracker = self._graph.create_tracker() start_ns = time.perf_counter_ns() try: @@ -325,24 +299,23 @@ async def run(self, input: Any) -> AgentGraphResult: config={'callbacks': [handler], 'recursion_limit': 25}, ) - duration = (time.perf_counter_ns() - start_ns) // 1_000_000 + duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 messages = result.get('messages', []) output = extract_last_message_content(messages) + total_usage = sum_token_usage_from_messages(messages) - # Flush per-node metrics to LD trackers; eval results are tracked - # internally and intentionally not exposed on AgentGraphResult here - # — judge dispatch is the managed layer's responsibility. - await handler.flush(self._graph, pending_eval_tasks) - - tracker.track_path(handler.path) - tracker.track_duration(duration) - tracker.track_invocation_success() - tracker.track_total_tokens(sum_token_usage_from_messages(messages)) + node_metrics = handler.node_metrics - return AgentGraphResult( - output=output, + return AgentGraphRunnerResult( + content=output, raw=result, - metrics=LDAIMetrics(success=True), + metrics=GraphMetrics( + success=True, + path=handler.path, + duration_ms=duration_ms, + usage=total_usage if (total_usage is not None and total_usage.total > 0) else None, + node_metrics=node_metrics, + ), ) except Exception as exc: @@ -353,13 +326,12 @@ async def run(self, input: Any) -> AgentGraphResult: ) else: log.warning(f'LangGraphAgentGraphRunner run failed: {exc}') - duration = (time.perf_counter_ns() - start_ns) // 1_000_000 - tracker.track_duration(duration) - tracker.track_invocation_failure() - return AgentGraphResult( - output='', + duration_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 + return AgentGraphRunnerResult( + content='', raw=None, - metrics=LDAIMetrics(success=False), + metrics=GraphMetrics( + success=False, + duration_ms=duration_ms, + ), ) - finally: - _run_eval_tasks.reset(token) diff --git a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py index 183a3eb7..4929ffea 100644 --- a/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py +++ b/packages/ai-providers/server-ai-langchain/src/ldai_langchain/langgraph_callback_handler.py @@ -4,8 +4,7 @@ from langchain_core.callbacks import BaseCallbackHandler from langchain_core.outputs import ChatGeneration, LLMResult -from ldai.agent_graph import AgentGraphDefinition -from ldai.providers.types import JudgeResult +from ldai.providers.types import LDAIMetrics from ldai.tracker import TokenUsage from ldai_langchain.langchain_helper import get_ai_usage_from_response @@ -20,8 +19,10 @@ class LDMetricsCallbackHandler(BaseCallbackHandler): LangChain callback handler that collects per-node metrics during a LangGraph run. - Records token usage, tool calls, and duration for each agent node in the graph, - then flushes them to LaunchDarkly trackers after the run completes via ``flush()``. + Records token usage, tool calls, and duration for each agent node in the graph. + Each node's :class:`~ldai.providers.types.LDAIMetrics` is built incrementally + as callbacks fire. Access the ``node_metrics`` property after the run completes + to retrieve the accumulated per-node metrics. """ def __init__(self, node_keys: Set[str], fn_name_to_config_key: Dict[str, str]): @@ -39,14 +40,10 @@ def __init__(self, node_keys: Set[str], fn_name_to_config_key: Dict[str, str]): # run_id -> node_key for active chain runs self._run_to_node: Dict[UUID, str] = {} - # accumulated token usage per node - self._node_tokens: Dict[str, TokenUsage] = {} - # tool config keys called per node - self._node_tool_calls: Dict[str, List[str]] = {} # start time (ns) per active run_id — keyed by run_id to handle re-entrant nodes self._node_start_ns: Dict[UUID, int] = {} - # accumulated duration (ms) per node - self._node_duration_ms: Dict[str, int] = {} + # per-node metrics, built incrementally as callbacks fire + self._node_metrics: Dict[str, LDAIMetrics] = {} # execution path in order (deduplicated) self._path: List[str] = [] self._path_set: Set[str] = set() @@ -61,19 +58,9 @@ def path(self) -> List[str]: return list(self._path) @property - def node_tokens(self) -> Dict[str, TokenUsage]: - """Accumulated token usage per node key.""" - return dict(self._node_tokens) - - @property - def node_tool_calls(self) -> Dict[str, List[str]]: - """Tool config keys called per node key.""" - return {k: list(v) for k, v in self._node_tool_calls.items()} - - @property - def node_durations_ms(self) -> Dict[str, int]: - """Accumulated duration in milliseconds per node key.""" - return dict(self._node_duration_ms) + def node_metrics(self) -> Dict[str, LDAIMetrics]: + """Per-node metrics keyed by node key.""" + return dict(self._node_metrics) # ------------------------------------------------------------------ # Callbacks @@ -101,10 +88,10 @@ def on_chain_start( if name not in self._path_set: self._path.append(name) self._path_set.add(name) + self._node_metrics[name] = LDAIMetrics(success=False) elif name.endswith('__tools'): stripped = name[: -len('__tools')] if stripped in self._node_keys: - # Attribute tool events to the owning agent node self._run_to_node[run_id] = stripped def on_chain_end( @@ -121,9 +108,10 @@ def on_chain_end( start_ns = self._node_start_ns.pop(run_id, None) if start_ns is not None: elapsed_ms = (time.perf_counter_ns() - start_ns) // 1_000_000 - self._node_duration_ms[node_key] = ( - self._node_duration_ms.get(node_key, 0) + elapsed_ms - ) + metrics = self._node_metrics.get(node_key) + if metrics is not None: + metrics.success = True + metrics.duration_ms = (metrics.duration_ms or 0) + elapsed_ms def on_llm_end( self, @@ -151,11 +139,14 @@ def on_llm_end( if usage is None: return - existing = self._node_tokens.get(node_key) + metrics = self._node_metrics.get(node_key) + if metrics is None: + return + existing = metrics.usage if existing is None: - self._node_tokens[node_key] = usage + metrics.usage = usage else: - self._node_tokens[node_key] = TokenUsage( + metrics.usage = TokenUsage( total=existing.total + usage.total, input=existing.input + usage.input, output=existing.output + usage.output, @@ -179,64 +170,11 @@ def on_tool_end( config_key = self._fn_name_to_config_key.get(name) if config_key is None: - # Tool is not a registered functional tool (e.g. a handoff tool) — skip tracking. return - if node_key not in self._node_tool_calls: - self._node_tool_calls[node_key] = [] - self._node_tool_calls[node_key].append(config_key) - - # ------------------------------------------------------------------ - # Flush - # ------------------------------------------------------------------ - - async def flush( - self, graph: AgentGraphDefinition, eval_tasks=None - ) -> List[JudgeResult]: - """ - Emit all collected per-node metrics to the LaunchDarkly trackers. - - Call this once after the graph run completes. - - :param graph: The AgentGraphDefinition whose nodes hold the LD config trackers. - :param eval_tasks: Optional dict mapping node key to a list of awaitables that - return judge evaluation results. Multiple tasks arise when a node is visited - more than once (e.g. in a graph with cycles). - :return: All judge results collected across all nodes. - """ - node_trackers: Dict[str, Any] = {} - all_eval_results: List[JudgeResult] = [] - for node_key in self._path: - if node_key in node_trackers: - continue - node = graph.get_node(node_key) - if not node: - continue - config_tracker = node.get_config().create_tracker() - if not config_tracker: - continue - node_trackers[node_key] = config_tracker - - usage = self._node_tokens.get(node_key) - if usage: - config_tracker.track_tokens(usage) - - duration = self._node_duration_ms.get(node_key) - if duration is not None: - config_tracker.track_duration(duration) - - config_tracker.track_success() - - for tool_key in self._node_tool_calls.get(node_key, []): - config_tracker.track_tool_call(tool_key) - - if not eval_tasks: - continue - - for eval_task in eval_tasks.get(node_key, []): - results = await eval_task - all_eval_results.extend(results) - for r in results: - if r.success: - config_tracker.track_judge_result(r) - - return all_eval_results + metrics = self._node_metrics.get(node_key) + if metrics is None: + return + if metrics.tool_calls is None: + metrics.tool_calls = [config_key] + else: + metrics.tool_calls.append(config_key) diff --git a/packages/ai-providers/server-ai-langchain/tests/test_langchain_provider.py b/packages/ai-providers/server-ai-langchain/tests/test_langchain_provider.py index a8fc46cf..d6cf83f6 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_langchain_provider.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_langchain_provider.py @@ -1,10 +1,9 @@ """Tests for LangChain Provider.""" -import pytest from unittest.mock import AsyncMock, MagicMock +import pytest from langchain_core.messages import AIMessage, HumanMessage, SystemMessage - from ldai import LDMessage from ldai.evaluator import Evaluator @@ -404,6 +403,7 @@ class TestCreateAgent: def test_creates_agent_runner_with_instructions_and_tool_definitions(self): """Should create LangChainAgentRunner wrapping a compiled graph.""" from unittest.mock import patch + from ldai_langchain import LangChainAgentRunner mock_ai_config = MagicMock() @@ -436,6 +436,7 @@ def test_creates_agent_runner_with_instructions_and_tool_definitions(self): def test_creates_agent_runner_with_no_tools(self): """Should create LangChainAgentRunner with no tool definitions.""" from unittest.mock import patch + from ldai_langchain import LangChainAgentRunner mock_ai_config = MagicMock() @@ -522,6 +523,7 @@ class TestBuildTools: def test_registers_sync_callable_as_structured_tool_func(self): from ldai.models import AIAgentConfig, ModelConfig, ProviderConfig + from ldai_langchain.langchain_helper import build_structured_tools def sync_tool(x: str = '') -> str: @@ -546,6 +548,7 @@ def sync_tool(x: str = '') -> str: def test_registers_async_callable_as_structured_tool_coroutine(self): from ldai.models import AIAgentConfig, ModelConfig, ProviderConfig + from ldai_langchain.langchain_helper import build_structured_tools async def async_tool(x: str = '') -> str: diff --git a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py index 0a3ff6ca..76bf5305 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_agent_graph_runner.py @@ -1,14 +1,16 @@ """Tests for LangGraphAgentGraphRunner and LangChainRunnerFactory.create_agent_graph().""" -import pytest from unittest.mock import AsyncMock, MagicMock, patch +import pytest from ldai.agent_graph import AgentGraphDefinition from ldai.evaluator import Evaluator -from ldai.models import AIAgentGraphConfig, AIAgentConfig, ModelConfig, ProviderConfig -from ldai.providers import AgentGraphResult, ToolRegistry -from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner +from ldai.models import AIAgentConfig, AIAgentGraphConfig, ModelConfig, ProviderConfig +from ldai.providers import ToolRegistry +from ldai.providers.types import AgentGraphRunnerResult + from ldai_langchain.langchain_runner_factory import LangChainRunnerFactory +from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner def _make_graph(enabled: bool = True) -> AgentGraphDefinition: @@ -75,22 +77,22 @@ async def test_langgraph_runner_run_raises_when_langgraph_not_installed(): with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}): result = await runner.run("test") - assert isinstance(result, AgentGraphResult) + assert isinstance(result, AgentGraphRunnerResult) assert result.metrics.success is False @pytest.mark.asyncio -async def test_langgraph_runner_run_tracks_failure_on_exception(): +async def test_langgraph_runner_run_returns_failure_on_exception(): + """Runner now returns AgentGraphRunnerResult; managed layer drives tracker events.""" graph = _make_graph() - tracker = graph.create_tracker() runner = LangGraphAgentGraphRunner(graph, {}) with patch.dict('sys.modules', {'langgraph': None, 'langgraph.graph': None}): result = await runner.run("fail") + assert isinstance(result, AgentGraphRunnerResult) assert result.metrics.success is False - tracker.track_invocation_failure.assert_called_once() - tracker.track_duration.assert_called_once() + assert result.metrics.duration_ms is not None @pytest.mark.asyncio @@ -147,9 +149,10 @@ async def test_langgraph_runner_run_success(): runner = LangGraphAgentGraphRunner(graph, {}) result = await runner.run("find restaurants") - assert isinstance(result, AgentGraphResult) - assert result.output == "langgraph answer" - assert result.metrics.success is True - tracker.track_path.assert_called_once_with([]) - tracker.track_invocation_success.assert_called_once() - tracker.track_duration.assert_called_once() + assert isinstance(result, AgentGraphRunnerResult) + assert result.metrics.duration_ms is not None + # Tracker events now fire from the managed layer (ManagedAgentGraph) using + # result.metrics; the runner no longer touches the graph tracker directly. + tracker.track_path.assert_not_called() + tracker.track_invocation_success.assert_not_called() + tracker.track_duration.assert_not_called() diff --git a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_callback_handler.py b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_callback_handler.py index 65592faa..d3da3db4 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_langgraph_callback_handler.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_langgraph_callback_handler.py @@ -5,72 +5,14 @@ fires during a graph run — without needing a real or mock LangGraph execution. """ -from collections import defaultdict -from unittest.mock import MagicMock from uuid import uuid4 -import pytest - from langchain_core.messages import AIMessage from langchain_core.outputs import ChatGeneration, LLMResult -from ldai.agent_graph import AgentGraphDefinition -from ldai.models import AIAgentConfig, AIAgentGraphConfig, ModelConfig, ProviderConfig -from ldai.tracker import AIGraphTracker, LDAIConfigTracker, TokenUsage -from ldai.evaluator import Evaluator from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _make_graph(mock_ld_client: MagicMock, node_key: str = 'root-agent', graph_key: str = 'test-graph'): - """Build a minimal single-node AgentGraphDefinition for flush() tests.""" - context = MagicMock() - node_tracker = LDAIConfigTracker( - ld_client=mock_ld_client, - variation_key='v1', - config_key=node_key, - version=1, - model_name='gpt-4', - provider_name='openai', - context=context, - run_id='test-run-id', - graph_key=graph_key, - ) - graph_tracker = AIGraphTracker( - ld_client=mock_ld_client, - variation_key='v1', - graph_key=graph_key, - version=1, - context=context, - ) - node_config = AIAgentConfig( - key=node_key, - enabled=True, - evaluator=Evaluator.noop(), - model=ModelConfig(name='gpt-4', parameters={}), - provider=ProviderConfig(name='openai'), - instructions='Be helpful.', - create_tracker=lambda: node_tracker, - ) - graph_config = AIAgentGraphConfig( - key=graph_key, - root_config_key=node_key, - edges=[], - enabled=True, - ) - nodes = AgentGraphDefinition.build_nodes(graph_config, {node_key: node_config}) - return AgentGraphDefinition( - agent_graph=graph_config, - nodes=nodes, - context=context, - enabled=True, - create_tracker=lambda: graph_tracker, - ) - - def _llm_result(total: int, prompt: int, completion: int) -> LLMResult: return LLMResult( generations=[[ChatGeneration( @@ -84,14 +26,6 @@ def _llm_result(total: int, prompt: int, completion: int) -> LLMResult: ) -def _events(mock_ld_client: MagicMock) -> dict: - result = defaultdict(list) - for call in mock_ld_client.track.call_args_list: - name, _ctx, data, value = call.args - result[name].append((data, value)) - return dict(result) - - # --------------------------------------------------------------------------- # on_chain_start tests # --------------------------------------------------------------------------- @@ -104,6 +38,15 @@ def test_on_chain_start_records_agent_node(): assert handler.path == ['root-agent'] +def test_on_chain_start_seeds_node_metrics(): + """Agent node gets an LDAIMetrics entry with success=False on first chain_start.""" + handler = LDMetricsCallbackHandler({'root-agent'}, {}) + handler.on_chain_start({}, {}, run_id=uuid4(), name='root-agent') + metrics = handler.node_metrics + assert 'root-agent' in metrics + assert metrics['root-agent'].success is False + + def test_on_chain_start_deduplicates_path(): """Multiple starts for the same node appear only once in path.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) @@ -165,8 +108,8 @@ def test_on_chain_end_accumulates_duration(): run_id = uuid4() handler.on_chain_start({}, {}, run_id=run_id, name='root-agent') handler.on_chain_end({}, run_id=run_id) - # Duration may be 0 on fast machines but the key must be present - assert 'root-agent' in handler.node_durations_ms + assert handler.node_metrics['root-agent'].duration_ms is not None + assert handler.node_metrics['root-agent'].success is True def test_on_chain_end_accumulates_across_multiple_runs(): @@ -176,12 +119,12 @@ def test_on_chain_end_accumulates_across_multiple_runs(): run1 = uuid4() handler.on_chain_start({}, {}, run_id=run1, name='root-agent') handler.on_chain_end({}, run_id=run1) - duration_after_first = handler.node_durations_ms.get('root-agent', 0) + duration_after_first = handler.node_metrics['root-agent'].duration_ms or 0 run2 = uuid4() handler.on_chain_start({}, {}, run_id=run2, name='root-agent') handler.on_chain_end({}, run_id=run2) - duration_after_second = handler.node_durations_ms.get('root-agent', 0) + duration_after_second = handler.node_metrics['root-agent'].duration_ms or 0 assert duration_after_second >= duration_after_first @@ -205,11 +148,11 @@ def test_on_llm_end_accumulates_tokens(): result = _llm_result(total=15, prompt=10, completion=5) handler.on_llm_end(result, run_id=uuid4(), parent_run_id=node_run_id) - tokens = handler.node_tokens.get('root-agent') - assert tokens is not None - assert tokens.total == 15 - assert tokens.input == 10 - assert tokens.output == 5 + usage = handler.node_metrics['root-agent'].usage + assert usage is not None + assert usage.total == 15 + assert usage.input == 10 + assert usage.output == 5 def test_on_llm_end_accumulates_across_multiple_calls(): @@ -223,10 +166,10 @@ def test_on_llm_end_accumulates_across_multiple_calls(): handler.on_llm_end(result1, run_id=uuid4(), parent_run_id=node_run_id) handler.on_llm_end(result2, run_id=uuid4(), parent_run_id=node_run_id) - tokens = handler.node_tokens['root-agent'] - assert tokens.total == 16 - assert tokens.input == 11 - assert tokens.output == 5 + usage = handler.node_metrics['root-agent'].usage + assert usage.total == 16 + assert usage.input == 11 + assert usage.output == 5 def test_on_llm_end_none_parent_run_id_ignored(): @@ -234,7 +177,7 @@ def test_on_llm_end_none_parent_run_id_ignored(): handler = LDMetricsCallbackHandler({'root-agent'}, {}) result = _llm_result(total=5, prompt=3, completion=2) handler.on_llm_end(result, run_id=uuid4(), parent_run_id=None) - assert handler.node_tokens == {} + assert handler.node_metrics == {} def test_on_llm_end_unknown_parent_run_id_ignored(): @@ -242,7 +185,7 @@ def test_on_llm_end_unknown_parent_run_id_ignored(): handler = LDMetricsCallbackHandler({'root-agent'}, {}) result = _llm_result(total=5, prompt=3, completion=2) handler.on_llm_end(result, run_id=uuid4(), parent_run_id=uuid4()) - assert handler.node_tokens == {} + assert handler.node_metrics == {} def test_on_llm_end_camel_case_token_keys(): @@ -260,11 +203,11 @@ def test_on_llm_end_camel_case_token_keys(): ) handler.on_llm_end(result, run_id=uuid4(), parent_run_id=node_run_id) - tokens = handler.node_tokens.get('root-agent') - assert tokens is not None - assert tokens.total == 20 - assert tokens.input == 12 - assert tokens.output == 8 + usage = handler.node_metrics['root-agent'].usage + assert usage is not None + assert usage.total == 20 + assert usage.input == 12 + assert usage.output == 8 # --------------------------------------------------------------------------- @@ -274,36 +217,42 @@ def test_on_llm_end_camel_case_token_keys(): def test_on_tool_end_records_tool_call(): """Tool end event records config key for the owning agent node.""" handler = LDMetricsCallbackHandler({'root-agent'}, {'fetch_weather': 'get_weather_open_meteo'}) + agent_run_id = uuid4() + handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('sunny', run_id=uuid4(), parent_run_id=tools_run_id, name='fetch_weather') - assert handler.node_tool_calls.get('root-agent') == ['get_weather_open_meteo'] + assert handler.node_metrics['root-agent'].tool_calls == ['get_weather_open_meteo'] def test_on_tool_end_skips_unregistered_tools(): """Tool end is ignored for tools not in the fn_name_to_config_key map (e.g. handoff tools).""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) + agent_run_id = uuid4() + handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('result', run_id=uuid4(), parent_run_id=tools_run_id, name='transfer_to_child') - assert handler.node_tool_calls.get('root-agent') is None + assert handler.node_metrics['root-agent'].tool_calls is None def test_on_tool_end_multiple_tools_accumulated(): """Multiple tool calls are accumulated in order.""" handler = LDMetricsCallbackHandler({'root-agent'}, {'search': 'search', 'summarize': 'summarize'}) + agent_run_id = uuid4() + handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('r1', run_id=uuid4(), parent_run_id=tools_run_id, name='search') handler.on_tool_end('r2', run_id=uuid4(), parent_run_id=tools_run_id, name='summarize') - assert handler.node_tool_calls.get('root-agent') == ['search', 'summarize'] + assert handler.node_metrics['root-agent'].tool_calls == ['search', 'summarize'] def test_on_tool_end_none_parent_run_id_ignored(): """Tool end with parent_run_id=None does not raise.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) handler.on_tool_end('result', run_id=uuid4(), parent_run_id=None, name='my_tool') - assert handler.node_tool_calls == {} + assert handler.node_metrics == {} def test_on_tool_end_none_name_ignored(): @@ -312,191 +261,110 @@ def test_on_tool_end_none_name_ignored(): run_id = uuid4() handler.on_chain_start({}, {}, run_id=run_id, name='root-agent') handler.on_tool_end('result', run_id=uuid4(), parent_run_id=run_id, name=None) - assert handler.node_tool_calls == {} + assert handler.node_metrics['root-agent'].tool_calls is None # --------------------------------------------------------------------------- -# flush() tests +# node_metrics property tests # --------------------------------------------------------------------------- -@pytest.mark.asyncio -async def test_flush_emits_token_events_to_ld_tracker(): - """flush() calls track_tokens on the node's config tracker.""" - mock_ld_client = MagicMock() - graph = _make_graph(mock_ld_client, node_key='root-agent', graph_key='g1') - tracker = graph.create_tracker() - +def test_node_metrics_includes_tokens(): + """node_metrics returns token usage for nodes that received LLM calls.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) node_run_id = uuid4() handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') handler.on_llm_end(_llm_result(15, 10, 5), run_id=uuid4(), parent_run_id=node_run_id) - await handler.flush(graph) - ev = _events(mock_ld_client) - assert ev['$ld:ai:tokens:total'][0][1] == 15 - assert ev['$ld:ai:tokens:input'][0][1] == 10 - assert ev['$ld:ai:tokens:output'][0][1] == 5 - assert ev['$ld:ai:generation:success'][0][1] == 1 + metrics = handler.node_metrics + assert 'root-agent' in metrics + node = metrics['root-agent'] + assert node.usage is not None + assert node.usage.total == 15 + assert node.usage.input == 10 + assert node.usage.output == 5 -@pytest.mark.asyncio -async def test_flush_emits_duration(): - """flush() calls track_duration when duration was recorded.""" - mock_ld_client = MagicMock() - graph = _make_graph(mock_ld_client) - tracker = graph.create_tracker() +def test_node_metrics_includes_duration(): + """node_metrics returns duration_ms for nodes that completed a chain run.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) run_id = uuid4() handler.on_chain_start({}, {}, run_id=run_id, name='root-agent') handler.on_chain_end({}, run_id=run_id) - await handler.flush(graph) - ev = _events(mock_ld_client) - assert '$ld:ai:duration:total' in ev + metrics = handler.node_metrics + assert 'root-agent' in metrics + assert metrics['root-agent'].duration_ms is not None + assert metrics['root-agent'].success is True -@pytest.mark.asyncio -async def test_flush_emits_tool_calls(): - """flush() calls track_tool_call for each recorded tool invocation.""" - mock_ld_client = MagicMock() - graph = _make_graph(mock_ld_client) - tracker = graph.create_tracker() +def test_node_metrics_includes_tool_calls(): + """node_metrics returns tool_calls for nodes with recorded tool invocations.""" handler = LDMetricsCallbackHandler({'root-agent'}, {'fn_search': 'search'}) - # The agent node must be started first so it appears in the path for flush() agent_run_id = uuid4() handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') - # Tool calls are attributed via the __tools chain run_id tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('r', run_id=uuid4(), parent_run_id=tools_run_id, name='fn_search') - await handler.flush(graph) - ev = _events(mock_ld_client) - tool_events = ev.get('$ld:ai:tool_call', []) - assert len(tool_events) == 1 - assert tool_events[0][0]['toolKey'] == 'search' + metrics = handler.node_metrics + assert 'root-agent' in metrics + assert metrics['root-agent'].tool_calls == ['search'] -@pytest.mark.asyncio -async def test_flush_includes_graph_key_in_node_events(): - """flush() passes graph_key to the node tracker so graphKey appears in events.""" - mock_ld_client = MagicMock() - graph = _make_graph(mock_ld_client, graph_key='my-graph') - tracker = graph.create_tracker() +def test_node_metrics_empty_when_no_nodes_executed(): + """node_metrics returns an empty dict when no nodes were executed.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) - node_run_id = uuid4() - handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') - handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id) - await handler.flush(graph) - - ev = _events(mock_ld_client) - token_data = ev['$ld:ai:tokens:total'][0][0] - assert token_data.get('graphKey') == 'my-graph' - - -@pytest.mark.asyncio -async def test_flush_with_no_graph_key_on_node_tracker(): - """When node tracker has no graph_key, events omit graphKey.""" - mock_ld_client = MagicMock() - context = MagicMock() - node_tracker = LDAIConfigTracker( - ld_client=mock_ld_client, - variation_key='v1', - config_key='root-agent', - version=1, - model_name='gpt-4', - provider_name='openai', - context=context, - run_id='test-run-id', - ) - node_config = AIAgentConfig( - key='root-agent', - enabled=True, - evaluator=Evaluator.noop(), - model=ModelConfig(name='gpt-4', parameters={}), - provider=ProviderConfig(name='openai'), - instructions='Be helpful.', - create_tracker=lambda: node_tracker, - ) - graph_config = AIAgentGraphConfig( - key='test-graph', - root_config_key='root-agent', - edges=[], - enabled=True, - ) - nodes = AgentGraphDefinition.build_nodes(graph_config, {'root-agent': node_config}) - graph = AgentGraphDefinition( - agent_graph=graph_config, - nodes=nodes, - context=context, - enabled=True, - create_tracker=lambda: AIGraphTracker(mock_ld_client, 'v1', 'test-graph', 1, context), - ) + metrics = handler.node_metrics + + assert metrics == {} + + +def test_node_metrics_multiple_nodes(): + """node_metrics returns separate entries for each executed node.""" + handler = LDMetricsCallbackHandler({'root-agent', 'child-agent'}, {}) + + root_run_id = uuid4() + handler.on_chain_start({}, {}, run_id=root_run_id, name='root-agent') + handler.on_llm_end(_llm_result(15, 10, 5), run_id=uuid4(), parent_run_id=root_run_id) + + child_run_id = uuid4() + handler.on_chain_start({}, {}, run_id=child_run_id, name='child-agent') + handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=child_run_id) + + metrics = handler.node_metrics + + assert 'root-agent' in metrics + assert 'child-agent' in metrics + assert metrics['root-agent'].usage.total == 15 + assert metrics['child-agent'].usage.total == 5 + + +def test_node_metrics_no_tool_calls_returns_none(): + """node_metrics sets tool_calls to None for nodes with no tool invocations.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) node_run_id = uuid4() handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id) - await handler.flush(graph) - ev = _events(mock_ld_client) - token_data = ev['$ld:ai:tokens:total'][0][0] - assert 'graphKey' not in token_data + metrics = handler.node_metrics + assert metrics['root-agent'].tool_calls is None -@pytest.mark.asyncio -async def test_flush_skips_nodes_not_in_path(): - """flush() only emits events for nodes that were actually executed.""" - mock_ld_client = MagicMock() - graph = _make_graph(mock_ld_client) - tracker = graph.create_tracker() - # Handler with 'root-agent' in node_keys but never started +def test_node_metrics_no_usage_returns_none(): + """node_metrics sets usage to None for nodes with no LLM calls.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) - await handler.flush(graph) - - ev = _events(mock_ld_client) - assert '$ld:ai:tokens:total' not in ev - assert '$ld:ai:generation:success' not in ev - - -@pytest.mark.asyncio -async def test_flush_skips_node_without_tracker(): - """flush() silently skips nodes whose config has no tracker.""" - mock_ld_client = MagicMock() - context = MagicMock() - - node_config_no_tracker = AIAgentConfig( - key='no-track', - enabled=True, - create_tracker=lambda: None, - evaluator=Evaluator.noop(), - model=ModelConfig(name='gpt-4', parameters={}), - provider=ProviderConfig(name='openai'), - instructions='', - ) - graph_config = AIAgentGraphConfig( - key='g', root_config_key='no-track', edges=[], enabled=True - ) - nodes = AgentGraphDefinition.build_nodes(graph_config, {'no-track': node_config_no_tracker}) - graph = AgentGraphDefinition( - agent_graph=graph_config, - nodes=nodes, - context=context, - enabled=True, - create_tracker=lambda: None, - ) + run_id = uuid4() + handler.on_chain_start({}, {}, run_id=run_id, name='root-agent') + handler.on_chain_end({}, run_id=run_id) - handler = LDMetricsCallbackHandler({'no-track'}, {}) - node_run_id = uuid4() - handler.on_chain_start({}, {}, run_id=node_run_id, name='no-track') - handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id) - await handler.flush(graph) # should not raise + metrics = handler.node_metrics - mock_ld_client.track.assert_not_called() + assert metrics['root-agent'].usage is None # --------------------------------------------------------------------------- @@ -512,12 +380,10 @@ def test_path_property_returns_copy(): assert handler.path == ['root-agent'] -def test_node_tokens_property_returns_copy(): +def test_node_metrics_property_returns_copy(): """Mutating the returned dict does not affect the handler's internal state.""" handler = LDMetricsCallbackHandler({'root-agent'}, {}) - node_run_id = uuid4() - handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') - handler.on_llm_end(_llm_result(5, 3, 2), run_id=uuid4(), parent_run_id=node_run_id) - tokens = handler.node_tokens - tokens['other'] = TokenUsage(total=1, input=1, output=0) - assert 'other' not in handler.node_tokens + handler.on_chain_start({}, {}, run_id=uuid4(), name='root-agent') + metrics = handler.node_metrics + del metrics['root-agent'] + assert 'root-agent' in handler.node_metrics diff --git a/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py b/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py index 3b45783d..f7838033 100644 --- a/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py +++ b/packages/ai-providers/server-ai-langchain/tests/test_tracking_langgraph.py @@ -6,16 +6,30 @@ with the correct payloads — without making real API calls. """ -import pytest from collections import defaultdict from unittest.mock import AsyncMock, MagicMock, patch +import pytest from ldai.agent_graph import AgentGraphDefinition -from ldai.models import AIAgentGraphConfig, AIAgentConfig, Edge, ModelConfig, ProviderConfig -from ldai.tracker import AIGraphTracker, LDAIConfigTracker from ldai.evaluator import Evaluator +from ldai.managed_agent_graph import ManagedAgentGraph +from ldai.models import ( + AIAgentConfig, + AIAgentGraphConfig, + Edge, + ModelConfig, + ProviderConfig, +) +from ldai.tracker import AIGraphTracker, LDAIConfigTracker + from ldai_langchain.langgraph_agent_graph_runner import LangGraphAgentGraphRunner + +async def _run_through_managed(runner: LangGraphAgentGraphRunner, graph: AgentGraphDefinition, input: str): + """Run the runner through the managed layer so graph-level tracking events fire.""" + managed = ManagedAgentGraph(graph, runner) + return await managed.run(input) + pytestmark = pytest.mark.skipif( pytest.importorskip('langgraph', reason='langgraph not installed') is None, reason='langgraph not installed', @@ -215,8 +229,10 @@ def _make_two_node_graph(mock_ld_client: MagicMock) -> 'AgentGraphDefinition': async def test_tracks_node_and_graph_tokens_on_success(): """Node-level and graph-level token events fire with the correct counts.""" from uuid import uuid4 + from langchain_core.messages import AIMessage as _AIMsg - from langchain_core.outputs import LLMResult, ChatGeneration + from langchain_core.outputs import ChatGeneration, LLMResult + from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler mock_ld_client = MagicMock() @@ -229,42 +245,40 @@ async def test_tracks_node_and_graph_tokens_on_success(): result = await runner.run("What's the weather?") assert result.metrics.success is True - assert result.output == 'Sunny.' - - # Manually simulate what the callback handler would collect and flush - # (mock models don't fire LangChain callbacks, so we test flush directly) - mock_ld_client2 = MagicMock() - graph2 = _make_graph(mock_ld_client2) - tracker2 = graph2.create_tracker() + assert result.content == 'Sunny.' + # Manually simulate what the callback handler would collect + # (mock models don't fire LangChain callbacks, so we test node_metrics directly) handler = LDMetricsCallbackHandler({'root-agent'}, {}) node_run_id = uuid4() handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') llm_result = LLMResult( generations=[[ChatGeneration( - message=_AIMsg(content='Sunny.', usage_metadata={'total_tokens': 15, 'input_tokens': 10, 'output_tokens': 5}), + message=_AIMsg( + content='Sunny.', + usage_metadata={'total_tokens': 15, 'input_tokens': 10, 'output_tokens': 5}, + ), text='Sunny.', )]], llm_output={}, ) handler.on_llm_end(llm_result, run_id=uuid4(), parent_run_id=node_run_id) handler.on_chain_end({}, run_id=node_run_id) - await handler.flush(graph2) - ev2 = _events(mock_ld_client2) - assert ev2['$ld:ai:tokens:total'][0][1] == 15 - assert ev2['$ld:ai:tokens:input'][0][1] == 10 - assert ev2['$ld:ai:tokens:output'][0][1] == 5 - assert ev2['$ld:ai:generation:success'][0][1] == 1 - assert '$ld:ai:duration:total' in ev2 + node_metrics = handler.node_metrics + assert 'root-agent' in node_metrics + node = node_metrics['root-agent'] + assert node.usage is not None + assert node.usage.total == 15 + assert node.usage.input == 10 + assert node.usage.output == 5 + assert node.success is True + assert node.duration_ms is not None - # Graph-level events from the real run - ev = _events(mock_ld_client) - assert ev['$ld:ai:graph:total_tokens'][0][1] == 15 - assert ev['$ld:ai:graph:invocation_success'][0][1] == 1 - assert '$ld:ai:graph:duration:total' in ev - assert '$ld:ai:graph:path' in ev + # Graph-level events are now driven by ManagedAgentGraph from + # AgentGraphRunnerResult.metrics — see test_managed_agent_graph.py for the + # managed-layer flow. The runner itself no longer fires graph-level events. @pytest.mark.asyncio @@ -277,17 +291,18 @@ async def test_tracks_execution_path(): with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model', return_value=_mock_model(fake_response)): runner = LangGraphAgentGraphRunner(graph, {}) - await runner.run('hello') + result = await runner.run('hello') - ev = _events(mock_ld_client) - path_data = ev['$ld:ai:graph:path'][0][0] - assert 'my-agent' in path_data['path'] + # Path now lives on AgentGraphRunnerResult.metrics.path; the runner no + # longer emits the $ld:ai:graph:path event directly (the managed layer does). + assert 'my-agent' in result.metrics.path @pytest.mark.asyncio async def test_tracks_tool_calls(): """A tool_call event fires for each tool name found in the model response.""" from uuid import uuid4 + from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler mock_ld_client = MagicMock() @@ -313,29 +328,23 @@ def get_weather(location: str = 'NYC') -> str: await runner.run('What is the weather?') # Simulate tool call tracking via the callback handler directly - mock_ld_client2 = MagicMock() - graph2 = _make_graph(mock_ld_client2, tool_names=['get_weather']) - tracker2 = graph2.create_tracker() - handler = LDMetricsCallbackHandler({'root-agent'}, {'get_weather': 'get_weather'}) - # Agent node must appear in path for flush() to emit its events agent_run_id = uuid4() handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('sunny', run_id=uuid4(), parent_run_id=tools_run_id, name='get_weather') - await handler.flush(graph2) - ev2 = _events(mock_ld_client2) - tool_events = ev2.get('$ld:ai:tool_call', []) - assert len(tool_events) == 1 - assert tool_events[0][0]['toolKey'] == 'get_weather' + node_metrics = handler.node_metrics + assert 'root-agent' in node_metrics + assert node_metrics['root-agent'].tool_calls == ['get_weather'] @pytest.mark.asyncio async def test_tracks_multiple_tool_calls(): """One tool_call event fires per tool name in the response.""" from uuid import uuid4 + from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler mock_ld_client = MagicMock() @@ -365,55 +374,42 @@ def summarize(text: str = '') -> str: await runner.run('Search and summarize.') # Simulate multiple tool calls via the callback handler directly - mock_ld_client2 = MagicMock() - graph2 = _make_graph(mock_ld_client2, tool_names=['search', 'summarize']) - tracker2 = graph2.create_tracker() - fn_map = {'search': 'search', 'summarize': 'summarize'} handler = LDMetricsCallbackHandler({'root-agent'}, fn_map) - # Agent node must appear in path for flush() to emit its events agent_run_id = uuid4() handler.on_chain_start({}, {}, run_id=agent_run_id, name='root-agent') tools_run_id = uuid4() handler.on_chain_start({}, {}, run_id=tools_run_id, name='root-agent__tools') handler.on_tool_end('result', run_id=uuid4(), parent_run_id=tools_run_id, name='search') handler.on_tool_end('summary', run_id=uuid4(), parent_run_id=tools_run_id, name='summarize') - await handler.flush(graph2) - ev2 = _events(mock_ld_client2) - tool_keys = [data['toolKey'] for data, _ in ev2.get('$ld:ai:tool_call', [])] - assert sorted(tool_keys) == ['search', 'summarize'] + node_metrics = handler.node_metrics + assert 'root-agent' in node_metrics + assert sorted(node_metrics['root-agent'].tool_calls) == ['search', 'summarize'] @pytest.mark.asyncio async def test_tracks_graph_key_on_node_events(): - """Node-level events include the graphKey so they can be correlated to the graph.""" + """Node-level metrics are collected per node and can be correlated to the graph via the runner result.""" from uuid import uuid4 + from langchain_core.messages import AIMessage as _AIMsg - from langchain_core.outputs import LLMResult, ChatGeneration + from langchain_core.outputs import ChatGeneration, LLMResult + from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler mock_ld_client = MagicMock() graph = _make_graph(mock_ld_client, graph_key='my-graph') - tracker = graph.create_tracker() + fake_response = _make_fake_response('OK.', input_tokens=5, output_tokens=3) - handler = LDMetricsCallbackHandler({'root-agent'}, {}) - node_run_id = uuid4() - handler.on_chain_start({}, {}, run_id=node_run_id, name='root-agent') - - llm_result = LLMResult( - generations=[[ChatGeneration( - message=_AIMsg(content='OK.', usage_metadata={'total_tokens': 8, 'input_tokens': 5, 'output_tokens': 3}), - text='OK.', - )]], - llm_output={}, - ) - handler.on_llm_end(llm_result, run_id=uuid4(), parent_run_id=node_run_id) - await handler.flush(graph) + with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model', + return_value=_mock_model(fake_response)): + runner = LangGraphAgentGraphRunner(graph, {}) + result = await runner.run('hello') - ev = _events(mock_ld_client) - token_data = ev['$ld:ai:tokens:total'][0][0] - assert token_data.get('graphKey') == 'my-graph' + # The runner result carries node_metrics; the managed layer uses graph_key from the graph + assert result.metrics.node_metrics is not None + assert 'root-agent' in result.metrics.node_metrics @pytest.mark.asyncio @@ -432,19 +428,19 @@ async def test_tracks_failure_and_latency_on_model_error(): result = await runner.run('fail') assert result.metrics.success is False - - ev = _events(mock_ld_client) - assert '$ld:ai:graph:invocation_failure' in ev - assert '$ld:ai:graph:duration:total' in ev - assert '$ld:ai:graph:invocation_success' not in ev + assert result.metrics.duration_ms is not None + # Graph-level events (invocation_failure, duration) are now driven by + # ManagedAgentGraph from result.metrics, not by the runner directly. @pytest.mark.asyncio async def test_multi_node_tracks_per_node_tokens_and_path(): """Each node emits its own token events; path and graph total cover both nodes.""" from uuid import uuid4 + from langchain_core.messages import AIMessage as _AIMsg - from langchain_core.outputs import LLMResult, ChatGeneration + from langchain_core.outputs import ChatGeneration, LLMResult + from ldai_langchain.langgraph_callback_handler import LDMetricsCallbackHandler mock_ld_client = MagicMock() @@ -461,22 +457,21 @@ def model_factory(node_config, **kwargs): with patch('ldai_langchain.langgraph_agent_graph_runner.create_langchain_model', side_effect=model_factory): runner = LangGraphAgentGraphRunner(graph, {}) - result = await runner.run('hello') + result = await _run_through_managed(runner, graph, 'hello') assert result.metrics.success is True - # Simulate per-node token events via callback handler (mock models don't fire callbacks) - mock_ld_client2 = MagicMock() - graph2 = _make_two_node_graph(mock_ld_client2) - tracker2 = graph2.create_tracker() - + # Simulate per-node metric collection via callback handler (mock models don't fire callbacks) handler = LDMetricsCallbackHandler({'root-agent', 'child-agent'}, {}) root_run_id = uuid4() handler.on_chain_start({}, {}, run_id=root_run_id, name='root-agent') root_llm_result = LLMResult( generations=[[ChatGeneration( - message=_AIMsg(content='Root done.', usage_metadata={'total_tokens': 15, 'input_tokens': 10, 'output_tokens': 5}), + message=_AIMsg( + content='Root done.', + usage_metadata={'total_tokens': 15, 'input_tokens': 10, 'output_tokens': 5}, + ), text='Root done.', )]], llm_output={}, @@ -487,22 +482,21 @@ def model_factory(node_config, **kwargs): handler.on_chain_start({}, {}, run_id=child_run_id, name='child-agent') child_llm_result = LLMResult( generations=[[ChatGeneration( - message=_AIMsg(content='Child done.', usage_metadata={'total_tokens': 5, 'input_tokens': 3, 'output_tokens': 2}), + message=_AIMsg( + content='Child done.', + usage_metadata={'total_tokens': 5, 'input_tokens': 3, 'output_tokens': 2}, + ), text='Child done.', )]], llm_output={}, ) handler.on_llm_end(child_llm_result, run_id=uuid4(), parent_run_id=child_run_id) - await handler.flush(graph2) - - ev2 = _events(mock_ld_client2) + node_metrics = handler.node_metrics - # Per-node token events identified by configKey - root_tokens = [(d, v) for d, v in ev2.get('$ld:ai:tokens:total', []) if d.get('configKey') == 'root-agent'] - child_tokens = [(d, v) for d, v in ev2.get('$ld:ai:tokens:total', []) if d.get('configKey') == 'child-agent'] - assert root_tokens[0][1] == 15 - assert child_tokens[0][1] == 5 + # Per-node token usage is keyed by node key + assert node_metrics['root-agent'].usage.total == 15 + assert node_metrics['child-agent'].usage.total == 5 # Graph-level total from the real runner run ev = _events(mock_ld_client) @@ -624,7 +618,7 @@ def model_factory(node_config, **kwargs): result = await runner.run('hello') assert result.metrics.success is True - assert 'Agent A' in result.output + assert 'Agent A' in result.content # Agent B's model must never have been invoked — no fan-out agent_b_model.ainvoke.assert_not_called() @@ -752,7 +746,7 @@ def model_factory(node_config, **kwargs): result = await runner.run('Find info and route to the right agent.') assert result.metrics.success is True - assert 'Agent A' in result.output + assert 'Agent A' in result.content # Orchestrator must have been called twice: once before tool result, once after assert orchestrator_model.ainvoke.call_count == 2 # Agent B must never have been invoked