-
Notifications
You must be signed in to change notification settings - Fork 3k
feat: Add graph pattern nodes for dynamic dispatch and composition #4583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
drahnreb
wants to merge
2
commits into
google:main
Choose a base branch
from
drahnreb:feat/graph-agent-pr2
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| # GraphAgent Basic Example — Conditional Routing | ||
|
|
||
| This example demonstrates a data validation pipeline using **conditional routing** based on runtime | ||
| state. The validator checks input quality and branches to either a processor (success path) or an | ||
| error handler (failure path), showing how GraphAgent enables state-dependent decision logic that | ||
| sequential or parallel agent composition alone cannot achieve. | ||
|
|
||
| ## When to Use This Pattern | ||
|
|
||
| - Any workflow requiring "if X then A, else B" branching on agent output | ||
| - Input validation before expensive downstream processing | ||
| - Quality-gate patterns where the next step depends on a score or classification | ||
|
|
||
| ## How to Run | ||
|
|
||
| ```bash | ||
| adk run contributing/samples/graph_agent_basic | ||
| ``` | ||
|
|
||
| ## Graph Structure | ||
|
|
||
| ``` | ||
| validate ──(valid=True)──▶ process | ||
| ──(valid=False)─▶ error | ||
| ``` | ||
|
|
||
| ## Key Code Walkthrough | ||
|
|
||
| - **`GraphNode(name="validate", agent=validator_agent)`** — wraps an `LlmAgent` as a graph node | ||
| - **`add_edge("validate", "process", condition=lambda s: s.data["valid"] == True)`** — conditional | ||
| edge that only fires when the validation flag is set | ||
| - **Two end nodes** (`process` and `error`) — GraphAgent can have multiple terminal nodes | ||
| - **State propagation** — each node's output is written to `state.data[node_name]` and read by | ||
| downstream condition functions | ||
| - **No cycles** — this is a simple directed acyclic graph; for loops see `graph_agent_dynamic_queue` | ||
|
|
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| """Basic GraphAgent example demonstrating conditional routing. | ||
|
|
||
| This example shows how GraphAgent enables conditional workflow routing | ||
| based on runtime state, which cannot be achieved with SequentialAgent | ||
| or ParallelAgent composition. | ||
|
|
||
| Use case: Data validation pipeline with retry logic. | ||
| - If validation passes -> process data | ||
| - If validation fails -> retry validation | ||
| - After max retries -> route to error handler | ||
| """ | ||
|
|
||
| import asyncio | ||
| import os | ||
|
|
||
| from google.adk.agents import GraphAgent | ||
| from google.adk.agents import LlmAgent | ||
| from google.adk.agents.graph import GraphState | ||
| from google.adk.runners import Runner | ||
| from google.adk.sessions import InMemorySessionService | ||
| from google.genai import types | ||
| from pydantic import BaseModel | ||
|
|
||
| _MODEL = os.getenv("LLM_MODEL_NAME", "gemini-2.5-flash") | ||
|
|
||
|
|
||
| # --- Validation Result Schema --- | ||
| class ValidationResult(BaseModel): | ||
| """Validation result structure.""" | ||
|
|
||
| valid: bool | ||
| error: str | None = None | ||
|
|
||
|
|
||
| # --- Validator Agent --- | ||
| validator = LlmAgent( | ||
| name="validator", | ||
| model=_MODEL, | ||
| instruction=""" | ||
| You validate input data quality. | ||
| Check if the input contains valid JSON. | ||
| Return {"valid": true} if valid, {"valid": false, "error": "reason"} if invalid. | ||
| """, | ||
| output_schema=ValidationResult, # Ensures structured JSON output | ||
| # output_key auto-defaults to "validator" (agent name) | ||
| ) | ||
|
|
||
| # --- Processor Agent --- | ||
| processor = LlmAgent( | ||
| name="processor", | ||
| model=_MODEL, | ||
| instruction=""" | ||
| You process validated data. | ||
| Transform the input JSON and return processed results. | ||
| """, | ||
| ) | ||
|
|
||
| # --- Error Handler Agent --- | ||
| error_handler = LlmAgent( | ||
| name="error_handler", | ||
| model=_MODEL, | ||
| instruction=""" | ||
| You handle validation errors. | ||
| Provide helpful error messages and suggestions for fixing invalid data. | ||
| """, | ||
| ) | ||
|
|
||
|
|
||
| # --- Edge Condition Functions --- | ||
| def is_valid_json(state: GraphState) -> bool: | ||
| """Check if JSON is valid from structured output.""" | ||
| result = state.get_parsed("validator", ValidationResult) | ||
| return result.valid if result else False | ||
|
|
||
|
|
||
| # --- Create GraphAgent with Conditional Routing --- | ||
| def build_validation_graph() -> GraphAgent: | ||
| """Build the validation pipeline graph.""" | ||
| g = GraphAgent(name="validation_pipeline") | ||
|
|
||
| # Add nodes | ||
| g.add_node("validate", agent=validator) | ||
| g.add_node("process", agent=processor) | ||
| g.add_node("error", agent=error_handler) | ||
|
|
||
| # Add conditional edges | ||
| # If validation passes (state.data["validator"]["valid"] == True) -> process | ||
| g.add_edge( | ||
| "validate", | ||
| "process", | ||
| condition=is_valid_json, | ||
| ) | ||
|
|
||
| # If validation fails (state.data["validator"]["valid"] == False) -> error handler | ||
| g.add_edge( | ||
| "validate", | ||
| "error", | ||
| condition=lambda state: not is_valid_json(state), | ||
| ) | ||
|
|
||
| # Define workflow | ||
| g.set_start("validate") | ||
| g.set_end("process") # Success path ends at process | ||
| g.set_end("error") # Error path ends at error handler | ||
|
|
||
| return g | ||
|
|
||
|
|
||
| # --- Run the workflow --- | ||
|
|
||
|
|
||
| async def main(): | ||
| graph = build_validation_graph() | ||
| runner = Runner( | ||
| app_name="validation_pipeline", | ||
| agent=graph, | ||
| session_service=InMemorySessionService(), | ||
| auto_create_session=True, | ||
| ) | ||
|
|
||
| # Example: Valid input | ||
| print("=== Testing with valid JSON ===") | ||
| async for event in runner.run_async( | ||
| user_id="user_1", | ||
| session_id="session_1", | ||
| new_message=types.Content( | ||
| role="user", | ||
| parts=[types.Part(text='{"name": "John", "age": 30}')], | ||
| ), | ||
| ): | ||
| if event.content and event.content.parts: | ||
| print(f"{event.author}: {event.content.parts[0].text}") | ||
|
|
||
| # Example: Invalid input | ||
| print("\n=== Testing with invalid JSON ===") | ||
| async for event in runner.run_async( | ||
| user_id="user_1", | ||
| session_id="session_2", | ||
| new_message=types.Content( | ||
| role="user", | ||
| parts=[types.Part(text='{"name": "Invalid data')], | ||
| ), | ||
| ): | ||
| if event.content and event.content.parts: | ||
| print(f"{event.author}: {event.content.parts[0].text}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| # yaml-language-server: $schema=https://raw.githubusercontent.com/google/adk-python/refs/heads/main/src/google/adk/agents/config_schemas/AgentConfig.json | ||
|
|
||
| agent_class: GraphAgent | ||
| name: validation_pipeline | ||
| description: Data validation pipeline with conditional routing | ||
|
|
||
| # Define start and end nodes | ||
| start_node: validate | ||
| end_nodes: | ||
| - process | ||
| - error | ||
|
|
||
| # Maximum iterations for cyclic graphs (default: 20) | ||
| max_iterations: 10 | ||
|
|
||
| # Node definitions | ||
| nodes: | ||
| - name: validate | ||
| sub_agents: | ||
| - code: agents.validator | ||
|
|
||
| - name: process | ||
| sub_agents: | ||
| - code: agents.processor | ||
|
|
||
| - name: error | ||
| sub_agents: | ||
| - code: agents.error_handler | ||
|
|
||
| # Edge definitions with conditional routing | ||
| edges: | ||
| # If validation passes -> process | ||
| - source_node: validate | ||
| target_node: process | ||
| condition: "data.get('valid', False) is True" | ||
| priority: 1 | ||
|
|
||
| # If validation fails -> error handler | ||
| - source_node: validate | ||
| target_node: error | ||
| condition: "data.get('valid', False) is False" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The condition condition: "data.get('validate').valid is False" |
||
| priority: 1 | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| # GraphAgent Dynamic Task Queue Example | ||
|
|
||
| This example demonstrates the **Dynamic Task Queue** pattern for GraphAgent, enabling AI Co-Scientist and similar workflows where tasks are generated and processed dynamically at runtime. | ||
|
|
||
| ## Pattern Overview | ||
|
|
||
| The dynamic task queue pattern uses a function node with runtime agent dispatch: | ||
| - **Task Queue**: Maintained in GraphState, grows/shrinks dynamically | ||
| - **Agent Dispatch**: Different agents selected based on task type | ||
| - **Dynamic Task Generation**: Agents generate new tasks from their outputs | ||
| - **State-Based Loop**: Continues until queue is empty | ||
|
|
||
| ## What This Example Shows | ||
|
|
||
| 1. **Mock Agents**: Three agents (generation, review, experiment) for demonstration | ||
| 2. **Task Parsing**: Extract TODO items from agent outputs to create new tasks | ||
| 3. **Dynamic Dispatch**: Select agent based on task type at runtime | ||
| 4. **Queue Management**: Process tasks until queue is empty | ||
|
|
||
| ## Architecture Support | ||
|
|
||
| This pattern enables **95%+ architecture support** for: | ||
| - AI Co-Scientist (dynamic hypothesis generation and testing) | ||
| - Research paper writing (dynamic outline → research → writing loops) | ||
| - Multi-agent task orchestration | ||
|
|
||
| ## Running the Example | ||
|
|
||
| ```bash | ||
| cd /path/to/adk-python | ||
| source venv/bin/activate | ||
| python contributing/samples/graph_agent_dynamic_queue/agent.py | ||
| ``` | ||
|
|
||
| The example will: | ||
| 1. Start with 2 initial tasks (generate hypothesis 1 and 2) | ||
| 2. Process each task with appropriate agent | ||
| 3. Parse agent outputs for new tasks (TODO: review X, TODO: experiment Y) | ||
| 4. Add new tasks to queue dynamically | ||
| 5. Continue until queue is empty | ||
|
|
||
| ## Adapting This Pattern | ||
|
|
||
| Replace the mock agents with real agents: | ||
| ```python | ||
| from your_agents import GenerationAgent, ReviewAgent, ExperimentAgent | ||
|
|
||
| generation_agent = GenerationAgent(name="generation") | ||
| review_agent = ReviewAgent(name="review") | ||
| experiment_agent = ExperimentAgent(name="experiment") | ||
| ``` | ||
|
|
||
| Customize task parsing logic in `parse_new_tasks_from_result()` to match your agent outputs. |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition
data.get('valid', False) is Trueappears to be incorrect. The output of thevalidatenode is aValidationResultobject, which is stored under the keyvalidatein the graph's state data. The condition should access thevalidattribute from this nested object.