Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions contributing/samples/graph_agent_basic/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# GraphAgent Basic Example — Conditional Routing

This example demonstrates a data validation pipeline using **conditional routing** based on runtime
state. The validator checks input quality and branches to either a processor (success path) or an
error handler (failure path), showing how GraphAgent enables state-dependent decision logic that
sequential or parallel agent composition alone cannot achieve.

## When to Use This Pattern

- Any workflow requiring "if X then A, else B" branching on agent output
- Input validation before expensive downstream processing
- Quality-gate patterns where the next step depends on a score or classification

## How to Run

```bash
adk run contributing/samples/graph_agent_basic
```

## Graph Structure

```
validate ──(valid=True)──▶ process
──(valid=False)─▶ error
```

## Key Code Walkthrough

- **`GraphNode(name="validate", agent=validator_agent)`** — wraps an `LlmAgent` as a graph node
- **`add_edge("validate", "process", condition=lambda s: s.data["valid"] == True)`** — conditional
edge that only fires when the validation flag is set
- **Two end nodes** (`process` and `error`) — GraphAgent can have multiple terminal nodes
- **State propagation** — each node's output is written to `state.data[node_name]` and read by
downstream condition functions
- **No cycles** — this is a simple directed acyclic graph; for loops see `graph_agent_dynamic_queue`

Empty file.
149 changes: 149 additions & 0 deletions contributing/samples/graph_agent_basic/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Basic GraphAgent example demonstrating conditional routing.

This example shows how GraphAgent enables conditional workflow routing
based on runtime state, which cannot be achieved with SequentialAgent
or ParallelAgent composition.

Use case: Data validation pipeline with retry logic.
- If validation passes -> process data
- If validation fails -> retry validation
- After max retries -> route to error handler
"""

import asyncio
import os

from google.adk.agents import GraphAgent
from google.adk.agents import LlmAgent
from google.adk.agents.graph import GraphState
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from pydantic import BaseModel

_MODEL = os.getenv("LLM_MODEL_NAME", "gemini-2.5-flash")


# --- Validation Result Schema ---
class ValidationResult(BaseModel):
"""Validation result structure."""

valid: bool
error: str | None = None


# --- Validator Agent ---
validator = LlmAgent(
name="validator",
model=_MODEL,
instruction="""
You validate input data quality.
Check if the input contains valid JSON.
Return {"valid": true} if valid, {"valid": false, "error": "reason"} if invalid.
""",
output_schema=ValidationResult, # Ensures structured JSON output
# output_key auto-defaults to "validator" (agent name)
)

# --- Processor Agent ---
processor = LlmAgent(
name="processor",
model=_MODEL,
instruction="""
You process validated data.
Transform the input JSON and return processed results.
""",
)

# --- Error Handler Agent ---
error_handler = LlmAgent(
name="error_handler",
model=_MODEL,
instruction="""
You handle validation errors.
Provide helpful error messages and suggestions for fixing invalid data.
""",
)


# --- Edge Condition Functions ---
def is_valid_json(state: GraphState) -> bool:
"""Check if JSON is valid from structured output."""
result = state.get_parsed("validator", ValidationResult)
return result.valid if result else False


# --- Create GraphAgent with Conditional Routing ---
def build_validation_graph() -> GraphAgent:
"""Build the validation pipeline graph."""
g = GraphAgent(name="validation_pipeline")

# Add nodes
g.add_node("validate", agent=validator)
g.add_node("process", agent=processor)
g.add_node("error", agent=error_handler)

# Add conditional edges
# If validation passes (state.data["validator"]["valid"] == True) -> process
g.add_edge(
"validate",
"process",
condition=is_valid_json,
)

# If validation fails (state.data["validator"]["valid"] == False) -> error handler
g.add_edge(
"validate",
"error",
condition=lambda state: not is_valid_json(state),
)

# Define workflow
g.set_start("validate")
g.set_end("process") # Success path ends at process
g.set_end("error") # Error path ends at error handler

return g


# --- Run the workflow ---


async def main():
graph = build_validation_graph()
runner = Runner(
app_name="validation_pipeline",
agent=graph,
session_service=InMemorySessionService(),
auto_create_session=True,
)

# Example: Valid input
print("=== Testing with valid JSON ===")
async for event in runner.run_async(
user_id="user_1",
session_id="session_1",
new_message=types.Content(
role="user",
parts=[types.Part(text='{"name": "John", "age": 30}')],
),
):
if event.content and event.content.parts:
print(f"{event.author}: {event.content.parts[0].text}")

# Example: Invalid input
print("\n=== Testing with invalid JSON ===")
async for event in runner.run_async(
user_id="user_1",
session_id="session_2",
new_message=types.Content(
role="user",
parts=[types.Part(text='{"name": "Invalid data')],
),
):
if event.content and event.content.parts:
print(f"{event.author}: {event.content.parts[0].text}")


if __name__ == "__main__":
asyncio.run(main())
42 changes: 42 additions & 0 deletions contributing/samples/graph_agent_basic/root_agent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/google/adk-python/refs/heads/main/src/google/adk/agents/config_schemas/AgentConfig.json

agent_class: GraphAgent
name: validation_pipeline
description: Data validation pipeline with conditional routing

# Define start and end nodes
start_node: validate
end_nodes:
- process
- error

# Maximum iterations for cyclic graphs (default: 20)
max_iterations: 10

# Node definitions
nodes:
- name: validate
sub_agents:
- code: agents.validator

- name: process
sub_agents:
- code: agents.processor

- name: error
sub_agents:
- code: agents.error_handler

# Edge definitions with conditional routing
edges:
# If validation passes -> process
- source_node: validate
target_node: process
condition: "data.get('valid', False) is True"
priority: 1

# If validation fails -> error handler
- source_node: validate
target_node: error
condition: "data.get('valid', False) is False"
priority: 1
53 changes: 53 additions & 0 deletions contributing/samples/graph_agent_dynamic_queue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# GraphAgent Dynamic Task Queue Example

This example demonstrates the **Dynamic Task Queue** pattern for GraphAgent, enabling AI Co-Scientist and similar workflows where tasks are generated and processed dynamically at runtime.

## Pattern Overview

The dynamic task queue pattern uses a function node with runtime agent dispatch:
- **Task Queue**: Maintained in GraphState, grows/shrinks dynamically
- **Agent Dispatch**: Different agents selected based on task type
- **Dynamic Task Generation**: Agents generate new tasks from their outputs
- **State-Based Loop**: Continues until queue is empty

## What This Example Shows

1. **Mock Agents**: Three agents (generation, review, experiment) for demonstration
2. **Task Parsing**: Extract TODO items from agent outputs to create new tasks
3. **Dynamic Dispatch**: Select agent based on task type at runtime
4. **Queue Management**: Process tasks until queue is empty

## Architecture Support

This pattern enables **95%+ architecture support** for:
- AI Co-Scientist (dynamic hypothesis generation and testing)
- Research paper writing (dynamic outline → research → writing loops)
- Multi-agent task orchestration

## Running the Example

```bash
cd /path/to/adk-python
source venv/bin/activate
python contributing/samples/graph_agent_dynamic_queue/agent.py
```

The example will:
1. Start with 2 initial tasks (generate hypothesis 1 and 2)
2. Process each task with appropriate agent
3. Parse agent outputs for new tasks (TODO: review X, TODO: experiment Y)
4. Add new tasks to queue dynamically
5. Continue until queue is empty

## Adapting This Pattern

Replace the mock agents with real agents:
```python
from your_agents import GenerationAgent, ReviewAgent, ExperimentAgent

generation_agent = GenerationAgent(name="generation")
review_agent = ReviewAgent(name="review")
experiment_agent = ExperimentAgent(name="experiment")
```

Customize task parsing logic in `parse_new_tasks_from_result()` to match your agent outputs.
Loading