Skip to content

Comments

feat: Add InterruptService for human-in-the-loop graph workflows#4585

Open
drahnreb wants to merge 4 commits intogoogle:mainfrom
drahnreb:feat/graph-agent-pr4
Open

feat: Add InterruptService for human-in-the-loop graph workflows#4585
drahnreb wants to merge 4 commits intogoogle:mainfrom
drahnreb:feat/graph-agent-pr4

Conversation

@drahnreb
Copy link

Please ensure you have read the contribution guide before creating a pull request.

Link to Issue or Description of Change

1. Link to an existing issue (if applicable):

2. Or, if no issue exists, describe the change:

Problem:
Many agent workflows need human oversight — approval gates, review checkpoints, error correction. There is no structured way to pause execution, present context to a human, and resume with their decision.

Solution:
Add interrupt system enabling pause/resume, human approval gates, and cancellation during graph execution. Includes InterruptService with priority queues and per-session metrics, InterruptReasoner for LLM-based interrupt decisions, GraphInterruptMixin for execution loop integration, and checkpoint tracing infrastructure.

What's included:

  • src/google/adk/agents/graph/interrupt.py — InterruptConfig, InterruptMode, InterruptAction
  • src/google/adk/agents/graph/interrupt_service.py — InterruptService, InterruptMessage, InterruptServiceConfig, QueueStatus, SessionMetrics
  • src/google/adk/agents/graph/interrupt_reasoner.py — InterruptReasoner, InterruptDecision, InterruptReasonerConfig
  • src/google/adk/agents/graph/graph_interrupt_handler.py — GraphInterruptMixin
  • src/google/adk/telemetry/checkpoint_tracing.py
  • Updated graph_agent.py with interrupt handling
  • Updated graph/__init__.py with interrupt exports
  • 5 test files (~175 tests): test_interrupt_cancellation.py, test_interrupt_integration.py, test_interrupt_reasoner.py, test_interrupt_service.py, test_checkpoint_tracing.py
  • 4 samples (graph_agent_hitl_review, graph_agent_hitl_orchestrated, examples 05-06/13)
  • interrupt_service_architecture.md

Part 4 of 5 — see tracking issue #4581. Stacked on #4584.

Testing Plan

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.
pytest tests/unittests/agents/test_interrupt_*.py -v — ~175 tests ✅
pytest tests/unittests/telemetry/test_checkpoint_tracing.py -v ✅
All prior tests still pass (regression) ✅

Manual End-to-End (E2E) Tests:

4 interrupt sample agents import and instantiate successfully.

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

Part 4 of 5. Depends on #4582 (Core), #4583 (Patterns), #4584 (Parallel).

@google-cla
Copy link

google-cla bot commented Feb 22, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @drahnreb, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances GraphAgent's capabilities by introducing a robust interrupt system, enabling human-in-the-loop workflows. It addresses the need for structured pausing, human decision-making, and dynamic resumption within complex agentic graphs. The changes provide greater control and flexibility for managing long-running or critical AI workflows, ensuring human oversight where necessary.

Highlights

  • Interrupt System for Human-in-the-Loop Workflows: Introduced a comprehensive interrupt system for GraphAgent, enabling dynamic pause/resume, human approval gates, and cancellation during graph execution. This includes InterruptService for managing priority queues and session metrics, InterruptReasoner for LLM-based interrupt decisions, and GraphInterruptMixin for seamless integration into the execution loop.
  • Checkpoint Tracing Infrastructure: Added new infrastructure for checkpoint tracing, enhancing observability and debugging capabilities for long-running or interrupted graph workflows.
  • Updated GraphAgent with Interrupt Handling: Integrated the new interrupt handling mechanisms directly into graph_agent.py, allowing GraphAgent workflows to leverage human intervention points.
  • New Documentation and Examples: Provided extensive documentation covering advanced GraphAgent patterns, design, node types, and the InterruptService architecture. Numerous new sample agents and example files demonstrate basic GraphAgent usage, dynamic task queues, HITL review, multi-agent coordination, and various interrupt scenarios.
  • OpenTelemetry Integration: Enhanced telemetry with OpenTelemetry integration for checkpoint, interrupt, and graph execution tracing, providing detailed metrics and spans for monitoring and debugging.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • contributing/docs/advanced_graph_patterns.md
    • Added documentation detailing advanced GraphAgent architectural patterns like dynamic task queues, nested invocations, and conditional parallel groups.
  • contributing/docs/graph_agent_design.md
    • Added a design document outlining the motivation, key capabilities, architecture, and comparison of GraphAgent to other workflow agents.
  • contributing/docs/graph_node_types.md
    • Added documentation explaining the different types of GraphAgent nodes, including GraphNode, ParallelNodeGroup, DynamicNode, NestedGraphNode, and DynamicParallelGroup.
  • contributing/docs/interrupt_service_architecture.md
    • Added a detailed architecture document for the InterruptService, covering its overview, core components, operation flows, security features, and scale management.
  • contributing/docs/pattern_apis.md
    • Added documentation introducing first-class pattern APIs for DynamicNode, NestedGraphNode, and DynamicParallelGroup, comparing them to function-node alternatives.
  • contributing/samples/graph_agent_basic/README.md
    • Added a README for the basic GraphAgent example, demonstrating conditional routing.
  • contributing/samples/graph_agent_basic/agent.py
    • Added a basic GraphAgent example demonstrating conditional routing for a data validation pipeline.
  • contributing/samples/graph_agent_basic/root_agent.yaml
    • Added a YAML configuration file for the basic GraphAgent example.
  • contributing/samples/graph_agent_dynamic_queue/README.md
    • Added a README for the dynamic task queue example, illustrating the AI Co-Scientist pattern.
  • contributing/samples/graph_agent_dynamic_queue/agent.py
    • Added a dynamic task queue example demonstrating runtime agent dispatch and dynamic task generation.
  • contributing/samples/graph_agent_hitl_orchestrated/README.md
    • Added a README for the composable HITL orchestrated pipeline example.
  • contributing/samples/graph_agent_hitl_orchestrated/agent.py
    • Added a HITL orchestrated pipeline example showcasing nested GraphNodes for review loops.
  • contributing/samples/graph_agent_hitl_review/README.md
    • Added a README for the HITL content review workflow example.
  • contributing/samples/graph_agent_hitl_review/agent.py
    • Added a HITL content review workflow example demonstrating human approval loops with InterruptService.
  • contributing/samples/graph_agent_multi_agent/README.md
    • Added a README for the multi-agent research workflow example.
  • contributing/samples/graph_agent_multi_agent/agent.py
    • Added a multi-agent research workflow example demonstrating parallel research and a quality review loop.
  • contributing/samples/graph_agent_pattern_dynamic_node/README.md
    • Added a README for the DynamicNode pattern example, illustrating runtime agent selection.
  • contributing/samples/graph_agent_pattern_dynamic_node/agent.py
    • Added a DynamicNode pattern example demonstrating mixture-of-experts routing.
  • contributing/samples/graph_agent_pattern_nested_graph/README.md
    • Added a README for the NestedGraphNode pattern example, showcasing hierarchical composition.
  • contributing/samples/graph_agent_pattern_nested_graph/agent.py
    • Added a NestedGraphNode pattern example demonstrating hierarchical workflow decomposition.
  • contributing/samples/graph_agent_pattern_parallel_group/README.md
    • Added a README for the DynamicParallelGroup pattern example, implementing Tree of Thoughts.
  • contributing/samples/graph_agent_pattern_parallel_group/agent.py
    • Added a DynamicParallelGroup pattern example demonstrating dynamic concurrent execution.
  • contributing/samples/graph_agent_react_pattern/README.md
    • Added a README for the ReAct pattern example.
  • contributing/samples/graph_agent_react_pattern/agent.py
    • Added a ReAct pattern example demonstrating reasoning, acting, and observing loops.
  • contributing/samples/graph_examples/01_basic/init.py
    • Added an __init__.py file for the basic GraphAgent example.
  • contributing/samples/graph_examples/01_basic/agent.py
    • Added a basic GraphAgent workflow example.
  • contributing/samples/graph_examples/02_conditional_routing/init.py
    • Added an __init__.py file for the conditional routing example.
  • contributing/samples/graph_examples/02_conditional_routing/agent.py
    • Added a conditional routing example for GraphAgent.
  • contributing/samples/graph_examples/03_cyclic_execution/init.py
    • Added an __init__.py file for the cyclic execution example.
  • contributing/samples/graph_examples/03_cyclic_execution/agent.py
    • Added a cyclic execution example for GraphAgent.
  • contributing/samples/graph_examples/05_interrupts_basic/init.py
    • Added an __init__.py file for the basic interrupts example.
  • contributing/samples/graph_examples/05_interrupts_basic/agent.py
    • Added a basic interrupts example demonstrating all 8 interrupt actions.
  • contributing/samples/graph_examples/06_interrupts_reasoning/init.py
    • Added an __init__.py file for the interrupts reasoning example.
  • contributing/samples/graph_examples/06_interrupts_reasoning/agent.py
    • Added an interrupts reasoning example demonstrating condition-based action selection.
  • contributing/samples/graph_examples/07_callbacks/init.py
    • Added an __init__.py file for the callbacks example.
  • contributing/samples/graph_examples/07_callbacks/agent.py
    • Added a node callbacks example demonstrating lifecycle hooks.
  • contributing/samples/graph_examples/08_rewind/init.py
    • Added an __init__.py file for the rewind example.
  • contributing/samples/graph_examples/08_rewind/agent.py
    • Added a rewind integration example for GraphAgent.
  • contributing/samples/graph_examples/09_parallel_wait_all/init.py
    • Added an __init__.py file for the parallel wait_all example.
  • contributing/samples/graph_examples/09_parallel_wait_all/agent.py
    • Added a parallel execution example using the WAIT_ALL strategy.
  • contributing/samples/graph_examples/10_parallel_wait_any/init.py
    • Added an __init__.py file for the parallel wait_any example.
  • contributing/samples/graph_examples/10_parallel_wait_any/agent.py
    • Added a parallel execution example using the WAIT_ANY strategy (race condition).
  • contributing/samples/graph_examples/11_parallel_wait_n/init.py
    • Added an __init__.py file for the parallel wait_n example.
  • contributing/samples/graph_examples/11_parallel_wait_n/agent.py
    • Added a parallel execution example using the WAIT_N strategy.
  • contributing/samples/graph_examples/13_parallel_interrupts/init.py
    • Added an __init__.py file for the parallel interrupts example.
  • contributing/samples/graph_examples/13_parallel_interrupts/agent.py
    • Added a parallel interrupts example demonstrating interrupt handling within parallel branches.
  • contributing/samples/graph_examples/14_parallel_rewind/init.py
    • Added an __init__.py file for the parallel rewind example.
  • contributing/samples/graph_examples/14_parallel_rewind/agent.py
    • Added a parallel execution and rewind example for GraphAgent.
  • contributing/samples/graph_examples/15_enhanced_routing/init.py
    • Added an __init__.py file for the enhanced routing example.
  • contributing/samples/graph_examples/15_enhanced_routing/agent.py
    • Added an enhanced routing example demonstrating priority, weighted, and fallback routing.
  • contributing/samples/graph_examples/README.md
    • Added a comprehensive README for all GraphAgent examples, including API overview, quick start, and feature matrix.
  • contributing/samples/graph_examples/init.py
    • Added an __init__.py file for the GraphAgent examples package.
  • contributing/samples/graph_examples/example_utils.py
    • Added shared utilities for GraphAgent examples, including LLM mode toggling.
  • contributing/samples/graph_examples/run_all_examples.sh
    • Added a shell script to run all GraphAgent examples.
  • contributing/samples/graph_examples/run_example.py
    • Added a utility script to run specific GraphAgent examples with optional tracing and LLM mode.
  • docs/future-work/dynamic-topology-modification.md
    • Added a design document outlining the implementation plan for dynamic topology modification in GraphAgent.
  • src/google/adk/init.py
    • Removed Context import and added GraphAgent related imports to the __all__ list.
  • src/google/adk/agents/init.py
    • Removed Context import and added GraphAgent, GraphNode, GraphState, START, and END imports to the __all__ list.
  • src/google/adk/agents/graph/init.py
    • Added a new __init__.py file to define the graph package, exporting all GraphAgent related components.
  • src/google/adk/agents/graph/callbacks.py
    • Added a new module defining callback infrastructure for graph observability and extensibility.
  • src/google/adk/agents/graph/evaluation_metrics.py
    • Added a new module defining custom evaluation metrics for GraphAgent workflows, including path matching and node execution counts.
  • src/google/adk/agents/graph/graph_agent.py
    • Added the core GraphAgent implementation, enabling directed graph-based orchestration with conditional routing, state management, and checkpointing.
  • src/google/adk/agents/graph/graph_agent_config.py
    • Added a new module defining the Pydantic configuration schema for GraphAgent.
  • src/google/adk/agents/graph/graph_agent_state.py
    • Added a new module defining the execution tracking state for GraphAgent.
  • src/google/adk/agents/graph/graph_edge.py
    • Added a new module defining EdgeCondition for conditional routing in graph workflows.
  • src/google/adk/agents/graph/graph_events.py
    • Added a new module defining typed events for streaming graph execution, including GraphEventType and GraphStreamMode.
  • src/google/adk/agents/graph/graph_export.py
    • Added a new module with functions to export graph structure and execution data for visualization.
  • src/google/adk/agents/graph/graph_interrupt_handler.py
    • Added a new mixin GraphInterruptMixin to encapsulate interrupt handling logic for GraphAgent.
  • src/google/adk/agents/graph/graph_node.py
    • Added a new module defining GraphNode as a wrapper for agents and functions within a graph.
  • src/google/adk/agents/graph/graph_rewind.py
    • Added a new module with a function rewind_to_node for temporal navigation within graph workflows.
  • src/google/adk/agents/graph/graph_state.py
    • Added a new module defining GraphState as the domain data container and StateReducer for merging node outputs.
  • src/google/adk/agents/graph/graph_telemetry.py
    • Added a new module defining GraphTelemetryMixin for OpenTelemetry instrumentation of GraphAgent execution.
  • src/google/adk/agents/graph/interrupt.py
    • Added a new module defining InterruptMode, InterruptConfig, and InterruptAction for human-in-the-loop interrupts.
  • src/google/adk/agents/graph/interrupt_reasoner.py
    • Added a new module defining InterruptReasoner for LLM-based intelligent decision-making on interrupt messages.
  • src/google/adk/agents/graph/interrupt_service.py
    • Added a new module defining InterruptService for dynamic runtime interrupts, including session management and metrics.
  • src/google/adk/agents/graph/parallel.py
    • Added a new module defining ParallelNodeGroup, JoinStrategy, and ErrorPolicy for concurrent execution in GraphAgent.
  • src/google/adk/agents/graph/patterns.py
    • Added a new module defining advanced graph patterns like DynamicNode, NestedGraphNode, and DynamicParallelGroup.
  • src/google/adk/agents/graph/state_utils.py
    • Added a new module with reusable utilities for parsing and handling state values.
  • src/google/adk/cli/agent_graph.py
    • Modified the agent_graph CLI tool to support visualization of GraphAgent structures, including NestedGraphNode, DynamicNode, and DynamicParallelGroup.
  • src/google/adk/telemetry/init.py
    • Modified the __init__.py file to include new telemetry modules.
  • src/google/adk/telemetry/checkpoint_tracing.py
    • Added a new module for OpenTelemetry instrumentation of checkpoint, interrupt, and resume operations.
  • src/google/adk/telemetry/graph_tracing.py
    • Added a new module for OpenTelemetry instrumentation specifically for GraphAgent workflow execution.
Activity
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a robust InterruptService for human-in-the-loop workflows within GraphAgent. The implementation is comprehensive, covering per-session isolation, bounded queues, and LLM-based reasoning for interrupt actions. The integration with the existing execution loop and telemetry is well-handled. My feedback focuses on improving the efficiency of queue peeking, enhancing the robustness of LLM response parsing, and refining the state clearing logic during 'go back' actions.

Comment on lines 747 to 795
def list_queued_messages(
self, session_id: str, page: int = 1, page_size: int = 50
) -> List[InterruptMessage]:
"""List queued messages with ADK-style pagination.

Provides non-destructive peek at queue for observability and debugging.
Messages remain in queue for processing by GraphAgent.

Safe in asyncio's single-threaded cooperative context: drain then requeue
is atomic from the perspective of all other coroutines (no await between
drain and requeue means no other coroutine can run between them).

Args:
session_id: Session identifier
page: Page number (1-indexed, default: 1)
page_size: Messages per page (default: 50, max: 1000)

Returns:
List of InterruptMessage objects (may be empty)
"""
# Validate pagination bounds (ADK pattern)
if page < 1:
page = 1
if page_size < 1 or page_size > 1000:
page_size = 50

if session_id not in self._message_queues:
return []

# Drain and requeue: safe in asyncio single-threaded context because
# there is no await between the drain and the requeue, so no other
# coroutine can interleave and observe the empty state.
queue = self._message_queues[session_id]
messages: List[InterruptMessage] = []
while not queue.empty():
try:
messages.append(queue.get_nowait())
except asyncio.QueueEmpty:
break
for msg in messages:
try:
queue.put_nowait(msg)
except asyncio.QueueFull:
break

# Paginate
offset = (page - 1) * page_size
return messages[offset : offset + page_size]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of list_queued_messages peeks at the queue by draining and then requeueing all messages. While this is task-safe in a single-threaded asyncio loop, it is inefficient for larger queues. A more direct way to peek at an asyncio.Queue without modifying it is to access its internal _queue attribute, which is a collections.deque.

  def list_queued_messages(
      self, session_id: str, page: int = 1, page_size: int = 50
  ) -> List[InterruptMessage]:
    if page < 1:
      page = 1
    if page_size < 1 or page_size > 1000:
      page_size = 50

    if session_id not in self._message_queues:
      return []

    # Access internal deque for non-destructive peek
    queue = self._message_queues[session_id]
    messages = list(queue._queue)

    offset = (page - 1) * page_size
    return messages[offset : offset + page_size]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. list_queued_messages now uses direct access to asyncio.Queue's internal _queue deque for non-destructive reads. No drain/requeue needed — avoids QueueFull risk entirely. See interrupt_service.py docstring update.

Comment on lines 307 to 319
if "```" in text:
parts = text.split("```")
if len(parts) >= 3:
inner = parts[1]
if inner.startswith("json"):
inner = inner[4:]
text = inner.strip()
elif len(parts) == 2:
inner = parts[1]
if inner.startswith("json"):
inner = inner[4:]
text = inner.strip()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The markdown stripping logic assumes that the JSON block is always the first code block in the response. If the LLM provides multiple blocks or preamble text with backticks, this might fail. It's safer to use a regex to find the first JSON-like block or iterate through all code blocks.

Suggested change
if "```" in text:
parts = text.split("```")
if len(parts) >= 3:
inner = parts[1]
if inner.startswith("json"):
inner = inner[4:]
text = inner.strip()
elif len(parts) == 2:
inner = parts[1]
if inner.startswith("json"):
inner = inner[4:]
text = inner.strip()
if "```" in text:
import re
# Find all blocks wrapped in triple backticks
matches = re.findall(r"```(?:json)?\s*([\s\S]*?)\s*```", text)
for match in matches:
try:
decision = json.loads(match)
return self._validate_decision(decision)
except json.JSONDecodeError:
continue

Copy link
Author

@drahnreb drahnreb Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction: Fixed: using output_schema=InterruptDecision (Pydantic model) ensures structured JSON output — no markdown stripping needed.

Comment on lines 375 to 376
for node_name in nodes_to_clear:
state.data.pop(node_name, None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The go_back logic assumes that the key in state.data is exactly the node name. While this is true for default output mappers, users might define custom mappers that write to different keys. This logic will fail to clear state for those nodes. Consider documenting this limitation or providing a way for nodes to specify which state keys they 'own'.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented: go_back clears state keys matching exact node names (default output_mapper convention). Custom output_mappers using different keys are the users responsibility.

Comment on lines 133 to 180
def _validate_condition_ast(node: ast.AST) -> None:
"""Walk AST and reject any unsafe node types.

Only allows: comparisons, boolean ops, unary not, attribute access,
safe method calls (.get, .get_parsed, .get_str, .get_dict),
constants, and whitelisted names.

Raises:
ValueError: If an unsafe AST node is encountered.
"""
if isinstance(node, ast.Expression):
_validate_condition_ast(node.body)
elif isinstance(node, ast.BoolOp):
for value in node.values:
_validate_condition_ast(value)
elif isinstance(node, ast.UnaryOp):
if not isinstance(node.op, ast.Not):
raise ValueError(f"Unsafe unary operator: {type(node.op).__name__}")
_validate_condition_ast(node.operand)
elif isinstance(node, ast.Compare):
_validate_condition_ast(node.left)
for comparator in node.comparators:
_validate_condition_ast(comparator)
elif isinstance(node, ast.Call):
if isinstance(node.func, ast.Attribute):
if node.func.attr not in _SAFE_METHODS:
raise ValueError(f"Unsafe method call: .{node.func.attr}()")
_validate_condition_ast(node.func.value)
else:
raise ValueError(f"Unsafe call: {ast.dump(node.func)}")
for arg in node.args:
_validate_condition_ast(arg)
for kw in node.keywords:
_validate_condition_ast(kw.value)
elif isinstance(node, ast.Attribute):
# Allow attribute access on safe names only
_validate_condition_ast(node.value)
elif isinstance(node, ast.Subscript):
_validate_condition_ast(node.value)
_validate_condition_ast(node.slice)
elif isinstance(node, ast.Name):
if node.id not in _SAFE_NAMES:
raise ValueError(f"Unsafe name: '{node.id}'")
elif isinstance(node, ast.Constant):
pass # string, int, float, bool, None literals are safe
else:
raise ValueError(f"Unsafe expression node: {type(node).__name__}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The AST validation for conditions is a good security measure. However, it currently blocks common utility functions like len(). If a workflow needs to route based on the size of a list in state (e.g., len(data.get('items')) > 0), it will fail. Consider adding len to a whitelist of safe built-in functions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done: len() is in _SAFE_BUILTINS whitelist.

@drahnreb drahnreb force-pushed the feat/graph-agent-pr4 branch 5 times, most recently from 75732b6 to d95fe68 Compare February 22, 2026 16:34
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an extensive and well-architected feature addition that introduces powerful graph-based workflow orchestration capabilities to the ADK. The new GraphAgent and its related components like InterruptService, GraphNode, and patterns for dynamic and nested execution are thoughtfully designed. The code is well-structured, breaking down complex logic into manageable mixins and helper modules. The inclusion of comprehensive samples, tests, and telemetry instrumentation is commendable and demonstrates a high level of quality. My review focuses on a recurring subtle bug in a few of the new sample files related to session state handling, which could be misleading for users. Overall, this is an excellent contribution.

Comment on lines 283 to 288
fresh_session = await session_service.get_session(
app_name="dynamic_queue_demo", user_id="demo_user", session_id=session.id
)
final_session = fresh_session or session
final_data = final_session.state.get("graph_data", {})
final_state = GraphState(data=final_data) if final_data else GraphState()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The fallback or session is problematic. The session object holds a stale, deep-copied state from when it was created via create_session. If get_session fails for some reason and returns None, this code will silently proceed to print incorrect final statistics from the old state, which is misleading for anyone running this sample. It's better to handle the None case explicitly to avoid this subtle bug.

Suggested change
fresh_session = await session_service.get_session(
app_name="dynamic_queue_demo", user_id="demo_user", session_id=session.id
)
final_session = fresh_session or session
final_data = final_session.state.get("graph_data", {})
final_state = GraphState(data=final_data) if final_data else GraphState()
final_session = await session_service.get_session(
app_name="dynamic_queue_demo", user_id="demo_user", session_id=session.id
)
if final_session:
final_data = final_session.state.get("graph_data", {})
final_state = GraphState(data=final_data)
else:
print("Warning: Could not retrieve final session state.")
final_state = GraphState()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: Explicit None check with warning print instead of silent stale fallback.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — changed from warn+fallback to early return when session cannot be retrieved.

Comment on lines 248 to 252
fresh_session = await session_service.get_session(
app_name="research_graph", user_id="user1", session_id=session.id
)
final_data = (fresh_session or session).state.get("graph_data", {})
final_state = GraphState(data=final_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Similar to another sample file in this PR, the fallback or session can lead to misleading results. The session object is a deepcopy from when the session was created and does not contain the final state from the graph execution. If get_session were to fail and return None, this code would fall back to the stale session object and print incorrect results. A more robust implementation would handle the None case explicitly.

Suggested change
fresh_session = await session_service.get_session(
app_name="research_graph", user_id="user1", session_id=session.id
)
final_data = (fresh_session or session).state.get("graph_data", {})
final_state = GraphState(data=final_data)
final_session = await session_service.get_session(
app_name="research_graph", user_id="user1", session_id=session.id
)
if final_session:
final_data = final_session.state.get("graph_data", {})
final_state = GraphState(data=final_data)
else:
print("Warning: Could not retrieve final session state.")
final_state = GraphState()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: Explicit None check with warning print instead of silent stale fallback.

Comment on lines 201 to 205
fresh_session = await session_service.get_session(
app_name="react_agent", user_id="user1", session_id=session.id
)
final_data = (fresh_session or session).state.get("graph_data", {})
final_state = GraphState(data=final_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This pattern of re-fetching the session and falling back to the original session object is problematic. The original session is a deepcopy from create_session and holds stale state. If get_session returns None, this code will silently use the old state and print incorrect final results, which is misleading. It's safer to handle the None case explicitly.

Suggested change
fresh_session = await session_service.get_session(
app_name="react_agent", user_id="user1", session_id=session.id
)
final_data = (fresh_session or session).state.get("graph_data", {})
final_state = GraphState(data=final_data)
final_session = await session_service.get_session(
app_name="react_agent", user_id="user1", session_id=session.id
)
if final_session:
final_data = final_session.state.get("graph_data", {})
final_state = GraphState(data=final_data)
else:
print("Warning: Could not retrieve final session state.")
final_state = GraphState()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: Explicit None check with warning print instead of silent stale fallback.

@drahnreb drahnreb force-pushed the feat/graph-agent-pr4 branch 3 times, most recently from c6f4902 to 8bbeedd Compare February 22, 2026 23:43
@drahnreb
Copy link
Author

Addressing review feedback

Force-pushed with the following fixes:

High:

  • Stale session fallback: All 6 sample files now use explicit None check with warning print instead of silent (fresh_session or session) fallback.

Medium:

  • Markdown stripping: Replaced regex-based stripping with output_schema=InterruptDecision (Pydantic model guarantees structured JSON output, no markdown wrapping).
  • go_back state clearing: Documented that it clears state keys matching exact node names (default output_mapper convention).
  • AST len() whitelist: Already done — len is in _SAFE_BUILTINS.

@drahnreb
Copy link
Author

@gemini-code-assist please re-review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major new feature: the GraphAgent and its associated components for building complex, stateful workflows with conditional routing, loops, and human-in-the-loop capabilities. This is a significant and well-executed addition to the ADK. The code is well-structured, with clear separation of concerns into modules for graph logic, interrupts, telemetry, and patterns. The inclusion of numerous samples and comprehensive tests is excellent.

My main feedback is a high-severity issue regarding the state merging logic in parallel execution, which could lead to unexpected behavior. Addressing this would make the parallel execution feature more robust and predictable.

Comment on lines 277 to 307
# Merge branch states back into main state with conflict detection
conflicts_detected = []
keys_merged = set()

for node_name in group.nodes:
if node_name not in results:
continue
result = results[node_name]
branch_state = result["state"]

# Merge data keys with conflict detection
for key, value in branch_state.data.items():
if (
key in state.data
and key in keys_merged
and state.data[key] != value
):
# Conflict: Multiple branches set the same key to different values
conflicts_detected.append({
"key": key,
"node": node_name,
"existing_value": state.data[key],
"new_value": value,
})
logger.warning(
f"State merge conflict detected: key '{key}' modified by multiple"
f" parallel branches. Last write wins (node: {node_name})."
)

state.data[key] = value
keys_merged.add(key)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current state merging logic uses a "last write wins" strategy based on the iteration order of nodes in the ParallelNodeGroup. This can lead to subtle bugs and non-deterministic behavior if two parallel branches modify the same state key. The result will depend on which branch is processed last in the loop, not which one finished execution last.

For example, if branch A modifies state.data['x'] and branch B does not, but B is processed second, B's unchanged value for x (from the initial state copy) will overwrite A's modification.

Consider a more explicit merge strategy that only merges keys that were actually changed by each branch. For conflicting writes to the same key, you could either raise an error or allow the user to provide a custom merge function to resolve the conflict. This would make the state updates more predictable and robust.

    # Merge branch states back into main state with conflict detection
    conflicts_detected = []
    keys_merged: set[str] = set()
    original_data = deepcopy(state.data)

    for node_name in group.nodes:
      if node_name not in results:
        continue
      result = results[node_name]
      branch_data = result["state"].data

      # Diff branch state against original to find changed keys
      for key, value in branch_data.items():
        original_value = original_data.get(key, object())  # Sentinel for new keys
        if value == original_value:
          continue  # Skip unchanged keys

        # This key was added or changed by the branch
        if key in keys_merged and state.data.get(key) != value:
          conflicts_detected.append({
              "key": key,
              "node": node_name,
              "existing_value": state.data.get(key),
              "new_value": value,
          })
          logger.warning(
              "State merge conflict: key '%s' written by multiple parallel"
              " branches. Last write from node '%s' wins.",
              key,
              node_name,
          )

        state.data[key] = value
        keys_merged.add(key)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — implemented diff-based state merge. Captures original snapshot before merge loop and only merges keys that actually changed from the pre-branch state.

Add GraphAgent for building directed-graph workflows with conditional
routing, cyclic execution, state management with reducers, typed events,
streaming, callbacks, rewind, resumability, telemetry with OpenTelemetry
tracing, evaluation metrics, and CLI graph visualization for GraphAgent
topologies. Includes samples and design documentation.
Add DynamicNode (runtime agent selection), NestedGraphNode (hierarchical
workflow composition), and DynamicParallelGroup (variable-count concurrent
execution). Extends CLI visualization with pattern-aware rendering
(diamond, parallelogram, sub-cluster shapes). Includes pattern samples,
node type reference, and design documentation.
@drahnreb drahnreb force-pushed the feat/graph-agent-pr4 branch from 8bbeedd to 218d2ec Compare February 23, 2026 13:38
@drahnreb
Copy link
Author

@gemini-code-assist please re-review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This is an impressive and substantial pull request that introduces the GraphAgent for complex workflow orchestration. The feature set is comprehensive, including conditional routing, cyclic execution, parallel processing, and a robust human-in-the-loop InterruptService. The code is well-structured, with good separation of concerns using mixins for telemetry and interrupt handling. The inclusion of numerous focused examples and advanced patterns is excellent and will greatly benefit users. Overall, the implementation is of high quality. My review includes a few suggestions to improve performance and robustness in the sample code and the interrupt service.

# asyncio.Queue stores items in _queue (a collections.deque).
# This avoids drain/requeue and eliminates QueueFull risk.
queue = self._message_queues[session_id]
messages = list(queue._queue) # type: ignore[attr-defined]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Accessing the private member _queue of asyncio.Queue is risky because it relies on an internal implementation detail of the standard library that is not guaranteed to be stable across Python versions. If asyncio.Queue's internal structure changes in a future release, this code will break.

While the current approach avoids a potential QueueFull error that could occur with a drain-and-requeue strategy, a more robust long-term solution should be considered. This might involve implementing a custom, non-blocking peek method or using a different queue implementation that officially supports this operation.

return {"all_complete": True, "tasks_remaining": 0}

# Pop next task
next_task = task_queue.pop(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using list.pop(0) for a queue is inefficient as it has an O(N) time complexity, which can be slow for large queues. For a more performant and idiomatic queue implementation, consider using collections.deque, which provides an O(1) popleft() operation.

Comment on lines +283 to +298
final_counter = session.state.get("counter")
if final_counter is None:
graph_data_raw = session.state.get("graph_data")
if graph_data_raw:
try:
data = (
json.loads(graph_data_raw)
if isinstance(graph_data_raw, str)
else graph_data_raw
)
final_counter = data.get("counter", 0)
except (json.JSONDecodeError, TypeError):
final_counter = 0

if final_counter is None:
final_counter = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to retrieve the final_counter is more complex than necessary. The GraphAgent's final event consistently places the final state in session.state['graph_data']. You can simplify this by directly accessing the counter from the final graph data with a simple fallback, which will improve the readability and maintainability of this example.

  session = await session_service.get_session(
      app_name="cyclic_demo", user_id="user1", session_id="session1"
  )
  graph_data = session.state.get("graph_data", {})
  final_counter = graph_data.get("counter", 0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant