feat: Add graph pattern nodes for dynamic dispatch and composition#4583
feat: Add graph pattern nodes for dynamic dispatch and composition#4583drahnreb wants to merge 2 commits intogoogle:mainfrom
Conversation
|
Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). View this failed invocation of the CLA check for more information. For the most up to date status, view the checks section at the bottom of the pull request. |
Summary of ChangesHello @drahnreb, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the GraphAgent's capabilities by introducing first-class support for dynamic dispatch, hierarchical composition, and variable-count parallel execution. These additions aim to streamline the development of complex agentic workflows by abstracting away common boilerplate, making it easier to build adaptive and modular agent systems. The changes also include improved observability and comprehensive examples to guide developers. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces advanced graph pattern nodes (DynamicNode, NestedGraphNode, DynamicParallelGroup) to the GraphAgent framework, enabling more complex and dynamic agentic workflows. The implementation is well-structured and includes comprehensive documentation and CLI support. My feedback focuses on improving the robustness of output extraction from asynchronous agent runs in the new node types, as the current logic of overwriting the output variable in each iteration may lead to data loss if agents emit multiple events or multi-part content.
| async for event in selected.run_async(node_ctx): | ||
| if event.content and event.content.parts: | ||
| output = event.content.parts[0].text or "" | ||
| return output |
There was a problem hiding this comment.
The current logic for extracting output from run_async is fragile. It overwrites the output variable in every iteration where event.content is present. If an agent yields multiple events with content (e.g., intermediate thoughts followed by a final answer), only the last one's first part is captured. Additionally, if the last event has content but an empty first part, any previously captured output is lost. Consider accumulating text or only updating output if the new text is non-empty.
| async for event in selected.run_async(node_ctx): | |
| if event.content and event.content.parts: | |
| output = event.content.parts[0].text or "" | |
| return output | |
| output = "" | |
| async for event in selected.run_async(node_ctx): | |
| if event.content and event.content.parts: | |
| # Accumulate text from all parts to avoid data loss | |
| event_text = "".join(p.text for p in event.content.parts if p.text) | |
| if event_text: | |
| output = event_text | |
| return output |
There was a problem hiding this comment.
Fixed: DynamicNode now uses output += with join for multi-part.
| async for event in self.graph_agent.run_async(nested_ctx): | ||
| if event.content and event.content.parts: | ||
| text = event.content.parts[0].text or "" | ||
| if text and not text.startswith("[GraphMetadata]"): | ||
| final_output = text |
There was a problem hiding this comment.
Similar to DynamicNode, the extraction logic here overwrites final_output in each iteration. If a nested graph produces multiple text events, only the last one is returned to the parent graph. It is safer to accumulate the output or ensure that non-empty results are not overwritten by subsequent empty content events.
| async for event in self.graph_agent.run_async(nested_ctx): | |
| if event.content and event.content.parts: | |
| text = event.content.parts[0].text or "" | |
| if text and not text.startswith("[GraphMetadata]"): | |
| final_output = text | |
| final_output = "" | |
| async for event in self.graph_agent.run_async(nested_ctx): | |
| if event.content and event.content.parts: | |
| text = "".join(p.text for p in event.content.parts if p.text) | |
| if text and not text.startswith("[GraphMetadata]"): | |
| final_output = text |
There was a problem hiding this comment.
Fixed: NestedGraphNode now uses final_output += with join for multi-part.
| async for event in agent.run_async(node_ctx): | ||
| if event.content and event.content.parts: | ||
| output = event.content.parts[0].text or "" | ||
| return output |
There was a problem hiding this comment.
The run_agent helper also overwrites the output variable in each iteration of the event stream. This can lead to incomplete results if the agent emits content across multiple events or uses multiple parts within an event.
| async for event in agent.run_async(node_ctx): | |
| if event.content and event.content.parts: | |
| output = event.content.parts[0].text or "" | |
| return output | |
| output = "" | |
| async for event in agent.run_async(node_ctx): | |
| if event.content and event.content.parts: | |
| event_text = "".join(p.text for p in event.content.parts if p.text) | |
| if event_text: | |
| output = event_text | |
| return output |
There was a problem hiding this comment.
Fixed: DynamicParallelGroup.run_agent now uses output += with join for multi-part.
ee39674 to
7302880
Compare
Addressing review feedbackForce-pushed with the following fixes:
|
f76b110 to
8224d46
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces powerful new graph patterns (DynamicNode, NestedGraphNode, DynamicParallelGroup) for dynamic and hierarchical agent workflows. The implementation is well-structured, with new features integrated into the core GraphAgent, visualization, and evaluation frameworks. The accompanying examples are comprehensive and very helpful. I've found one high-severity bug in the NestedGraphNode implementation where streaming output is not correctly accumulated, and several medium-severity issues in the example files that have a similar bug. Overall, this is a great feature addition.
| if event.content and event.content.parts: | ||
| text = event.content.parts[0].text or "" | ||
| if text and not text.startswith("[GraphMetadata]"): | ||
| final_output = text |
There was a problem hiding this comment.
There was a problem hiding this comment.
Fixed: NestedGraphNode accumulates with += and joins multi-part events.
| result = "" | ||
| async for event in agent.run_async(agent_ctx): | ||
| if event.content and event.content.parts: | ||
| result = event.content.parts[0].text or "" |
There was a problem hiding this comment.
The result variable is overwritten in each loop iteration. If the agent streams its response across multiple events, only the last part will be captured. To handle streaming responses correctly, you should accumulate the text from all events.
| result = event.content.parts[0].text or "" | |
| result += event.content.parts[0].text or "" | |
There was a problem hiding this comment.
Fixed: Changed to result += for proper accumulation.
| if event.content and event.content.parts: | ||
| text = event.content.parts[0].text or "" | ||
| if text and not text.startswith("[GraphMetadata]"): | ||
| final = text |
There was a problem hiding this comment.
Fixed: Changed to final += for proper accumulation.
| # Show node execution trace (from create_nested_observability_callback) | ||
| print(f" → {text}") | ||
| elif not text.startswith("[GraphMetadata]"): | ||
| final = text |
There was a problem hiding this comment.
Fixed: Changed to final += for proper accumulation.
| # Show node execution trace (from create_nested_observability_callback) | ||
| print(f" → {text}") | ||
| elif not text.startswith("[GraphMetadata]"): | ||
| final = text |
There was a problem hiding this comment.
Fixed: Changed to final += for proper accumulation.
518a5fa to
443997b
Compare
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a powerful set of features for building complex agentic workflows using graph patterns. The addition of DynamicNode, NestedGraphNode, and DynamicParallelGroup provides first-class support for common patterns like dynamic dispatch, hierarchical composition, and parallel fan-out, significantly reducing boilerplate. The implementation is comprehensive, including core logic, extensive tests, clear documentation, and practical samples. The code is well-structured and demonstrates a deep understanding of agentic design principles. My review focuses on minor improvements in the sample code to enhance robustness and clarity for developers using these new patterns.
| fresh_session = await session_service.get_session( | ||
| app_name="dynamic_queue_demo", user_id="demo_user", session_id=session.id | ||
| ) | ||
| final_session = fresh_session or session |
There was a problem hiding this comment.
The fallback or session can be misleading. If session_service.get_session returns None (e.g., the session was deleted), final_session will become the original, stale session object from create_session. This object does not contain the final state of the graph execution, so the statistics printed will be incorrect (likely showing zero completed tasks). It would be safer to handle the None case explicitly, for instance by raising an error or logging a warning that the final state could not be retrieved.
| final_session = fresh_session or session | |
| if not fresh_session: | |
| print("\n⚠️ Could not retrieve final session state to print statistics.") | |
| return | |
| final_session = fresh_session |
There was a problem hiding this comment.
Fixed — changed from warn+fallback to early return: if fresh_session is None: print(...); return.
| try: | ||
| obs = ObservationResult.model_validate_json(text.strip()) | ||
| status = obs.status.upper() | ||
| except Exception: | ||
| status = "UNKNOWN (parse error)" |
There was a problem hiding this comment.
Catching a broad Exception can hide unexpected errors. It's better to catch a more specific exception, like pydantic.ValidationError, which is what model_validate_json raises on failure. This makes the error handling more robust and predictable. You'll need to add ValidationError to your pydantic import.
| try: | |
| obs = ObservationResult.model_validate_json(text.strip()) | |
| status = obs.status.upper() | |
| except Exception: | |
| status = "UNKNOWN (parse error)" | |
| try: | |
| obs = ObservationResult.model_validate_json(text.strip()) | |
| status = obs.status.upper() | |
| except ValidationError: | |
| status = "UNKNOWN (parse error)" |
There was a problem hiding this comment.
Fixed — narrowed to except ValidationError: with explicit import from pydantic.
Add GraphAgent for building directed-graph workflows with conditional routing, cyclic execution, state management with reducers, typed events, streaming, callbacks, rewind, resumability, telemetry with OpenTelemetry tracing, evaluation metrics, and CLI graph visualization for GraphAgent topologies. Includes samples and design documentation.
Add DynamicNode (runtime agent selection), NestedGraphNode (hierarchical workflow composition), and DynamicParallelGroup (variable-count concurrent execution). Extends CLI visualization with pattern-aware rendering (diamond, parallelogram, sub-cluster shapes). Includes pattern samples, node type reference, and design documentation.
443997b to
a461d14
Compare
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces powerful new capabilities for graph-based agent orchestration with DynamicNode, NestedGraphNode, and DynamicParallelGroup. The implementation is robust, well-structured, and includes comprehensive examples, tests, and documentation. The attention to detail, such as the secure evaluation of string-based conditions and the thoughtful API design, is commendable. I've found one minor issue in an example YAML configuration, but overall this is an excellent contribution that significantly enhances the framework's workflow capabilities.
| # If validation passes -> process | ||
| - source_node: validate | ||
| target_node: process | ||
| condition: "data.get('valid', False) is True" |
There was a problem hiding this comment.
The condition data.get('valid', False) is True appears to be incorrect. The output of the validate node is a ValidationResult object, which is stored under the key validate in the graph's state data. The condition should access the valid attribute from this nested object.
condition: "data.get('validate').valid is True"| # If validation fails -> error handler | ||
| - source_node: validate | ||
| target_node: error | ||
| condition: "data.get('valid', False) is False" |
There was a problem hiding this comment.
The condition data.get('valid', False) is False appears to be incorrect. The output of the validate node is a ValidationResult object, which is stored under the key validate in the graph's state data. The condition should access the valid attribute from this nested object.
condition: "data.get('validate').valid is False"
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
Common agentic patterns (dispatch, nesting, fan-out) require writing custom function nodes with boilerplate for context creation, event collection, and observability.
Solution:
Add
DynamicNode(runtime agent selection),NestedGraphNode(hierarchical workflow composition), andDynamicParallelGroup(variable-count concurrent execution). Extends CLI visualization with pattern-aware rendering (diamond, parallelogram, sub-cluster shapes). Includes pattern samples, node type reference, and design documentation.What's included:
src/google/adk/agents/graph/patterns.pygraph_agent.py_get_node_agentwith pattern isinstance checksgraph/__init__.pywith pattern exportstest_graph_patterns.py(~25 tests), 3 pattern tests intest_agent_graph.pypattern_dynamic_node,pattern_nested_graph,pattern_parallel_group)graph_node_types.md,advanced_graph_patterns.md,pattern_apis.md,dynamic-topology-modification.md)Part 2 of 5 — see tracking issue #4581. Stacked on #4582.
Testing Plan
Unit Tests:
Manual End-to-End (E2E) Tests:
3 pattern sample agents import and instantiate successfully.
Checklist
Additional context
Part 2 of 5. Depends on #4582 (Core GraphAgent).
patterns.pyimports onlyGraphNode,GraphState,BaseAgent,InvocationContext— no dependency on parallel/interrupt/checkpoint.