diff --git a/python/packages/core/agent_framework/_workflows/_workflow_builder.py b/python/packages/core/agent_framework/_workflows/_workflow_builder.py index a1b90408be..70f8747ec9 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_builder.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_builder.py @@ -39,7 +39,40 @@ class WorkflowBuilder: """A builder class for constructing workflows. - This class provides methods to add edges and set the starting executor for the workflow. + This class provides a fluent API for defining workflow graphs by connecting executors + with edges and configuring execution parameters. Call :meth:`build` to create an + immutable :class:`Workflow` instance. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class UpperCaseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class ReverseExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text[::-1]) + + + # Build a workflow + workflow = ( + WorkflowBuilder() + .add_edge(UpperCaseExecutor(id="upper"), ReverseExecutor(id="reverse")) + .set_start_executor("upper") + .build() + ) + + # Run the workflow + events = await workflow.run("hello") + print(events.get_outputs()) # ['OLLEH'] """ def __init__( @@ -51,7 +84,7 @@ def __init__( """Initialize the WorkflowBuilder with an empty list of edges and no starting executor. Args: - max_iterations: Maximum number of iterations for workflow convergence. + max_iterations: Maximum number of iterations for workflow convergence. Default is 100. name: Optional human-readable name for the workflow. description: Optional description of what the workflow does. """ @@ -164,10 +197,22 @@ def add_agent( id: A unique identifier for the executor. If None, the agent's name will be used if available. Returns: - The WorkflowBuilder instance (for method chaining). + Self: The WorkflowBuilder instance for method chaining. Raises: ValueError: If the provided id or agent name conflicts with an existing executor. + + Example: + .. code-block:: python + + from agent_framework import WorkflowBuilder + from agent_framework_anthropic import AnthropicAgent + + # Create an agent + agent = AnthropicAgent(name="writer", model="claude-3-5-sonnet-20241022") + + # Add the agent to a workflow + workflow = WorkflowBuilder().add_agent(agent, output_response=True).set_start_executor(agent).build() """ executor = self._maybe_wrap_agent( agent, agent_thread=agent_thread, output_response=output_response, executor_id=id @@ -184,12 +229,53 @@ def add_edge( """Add a directed edge between two executors. The output types of the source and the input types of the target must be compatible. + Messages sent by the source executor will be routed to the target executor. Args: source: The source executor of the edge. target: The target executor of the edge. condition: An optional condition function that determines whether the edge should be traversed based on the message type. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class ProcessorA(Executor): + @handler + async def process(self, data: str, ctx: WorkflowContext[int]) -> None: + await ctx.send_message(len(data)) + + + class ProcessorB(Executor): + @handler + async def process(self, count: int, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(f"Processed {count} characters") + + + # Connect executors with an edge + workflow = ( + WorkflowBuilder().add_edge(ProcessorA(id="a"), ProcessorB(id="b")).set_start_executor("a").build() + ) + + + # With a condition + def only_large_numbers(msg: int) -> bool: + return msg > 100 + + + workflow = ( + WorkflowBuilder() + .add_edge(ProcessorA(id="a"), ProcessorB(id="b"), condition=only_large_numbers) + .set_start_executor("a") + .build() + ) """ # TODO(@taochen): Support executor factories for lazy initialization source_exec = self._maybe_wrap_agent(source) @@ -204,13 +290,50 @@ def add_fan_out_edges( source: Executor | AgentProtocol, targets: Sequence[Executor | AgentProtocol], ) -> Self: - """Add multiple edges to the workflow where messages from the source will be sent to all target. + """Add multiple edges to the workflow where messages from the source will be sent to all targets. The output types of the source and the input types of the targets must be compatible. + Messages from the source will be broadcast to all target executors concurrently. Args: source: The source executor of the edges. targets: A list of target executors for the edges. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class DataSource(Executor): + @handler + async def generate(self, count: int, ctx: WorkflowContext[str]) -> None: + for i in range(count): + await ctx.send_message(f"data_{i}") + + + class ValidatorA(Executor): + @handler + async def validate(self, data: str, ctx: WorkflowContext) -> None: + print(f"ValidatorA: {data}") + + + class ValidatorB(Executor): + @handler + async def validate(self, data: str, ctx: WorkflowContext) -> None: + print(f"ValidatorB: {data}") + + + # Broadcast to multiple validators + workflow = ( + WorkflowBuilder() + .add_fan_out_edges(DataSource(id="source"), [ValidatorA(id="val_a"), ValidatorB(id="val_b")]) + .set_start_executor("source") + .build() + ) """ source_exec = self._maybe_wrap_agent(source) target_execs = [self._maybe_wrap_agent(t) for t in targets] @@ -241,6 +364,53 @@ def add_switch_case_edge_group( Args: source: The source executor of the edges. cases: A list of case objects that determine the target executor for each message. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler, Case, Default + from dataclasses import dataclass + + + @dataclass + class Result: + score: int + + + class Evaluator(Executor): + @handler + async def evaluate(self, text: str, ctx: WorkflowContext[Result]) -> None: + await ctx.send_message(Result(score=len(text))) + + + class HighScoreHandler(Executor): + @handler + async def handle(self, result: Result, ctx: WorkflowContext) -> None: + print(f"High score: {result.score}") + + + class LowScoreHandler(Executor): + @handler + async def handle(self, result: Result, ctx: WorkflowContext) -> None: + print(f"Low score: {result.score}") + + + # Route based on score value + workflow = ( + WorkflowBuilder() + .add_switch_case_edge_group( + Evaluator(id="eval"), + [ + Case(condition=lambda r: r.score > 10, target=HighScoreHandler(id="high")), + Default(target=LowScoreHandler(id="low")), + ], + ) + .set_start_executor("eval") + .build() + ) """ source_exec = self._maybe_wrap_agent(source) source_id = self._add_executor(source_exec) @@ -270,13 +440,67 @@ def add_multi_selection_edge_group( Messages from the source executor will be sent to multiple target executors based on the provided selection function. - The selection function should take a message and the name of the target executors, - and return a list of indices indicating which target executors should receive the message. + The selection function should take a message and a list of target executor IDs, + and return a list of executor IDs indicating which target executors should receive the message. Args: source: The source executor of the edges. targets: A list of target executors for the edges. selection_func: A function that selects target executors for messages. + Takes (message, list[executor_id]) and returns list[executor_id]. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + from dataclasses import dataclass + + + @dataclass + class Task: + priority: str + data: str + + + class TaskDispatcher(Executor): + @handler + async def dispatch(self, text: str, ctx: WorkflowContext[Task]) -> None: + priority = "high" if len(text) > 10 else "low" + await ctx.send_message(Task(priority=priority, data=text)) + + + class WorkerA(Executor): + @handler + async def process(self, task: Task, ctx: WorkflowContext) -> None: + print(f"WorkerA processing: {task.data}") + + + class WorkerB(Executor): + @handler + async def process(self, task: Task, ctx: WorkflowContext) -> None: + print(f"WorkerB processing: {task.data}") + + + # Select workers based on task priority + def select_workers(task: Task, executor_ids: list[str]) -> list[str]: + if task.priority == "high": + return executor_ids # Send to all workers + return [executor_ids[0]] # Send to first worker only + + + workflow = ( + WorkflowBuilder() + .add_multi_selection_edge_group( + TaskDispatcher(id="dispatcher"), + [WorkerA(id="worker_a"), WorkerB(id="worker_b")], + selection_func=select_workers, + ) + .set_start_executor("dispatcher") + .build() + ) """ source_exec = self._maybe_wrap_agent(source) target_execs = [self._maybe_wrap_agent(t) for t in targets] @@ -298,31 +522,42 @@ def add_fan_in_edges( The target executor will receive a list of messages aggregated from all source executors. Thus the input types of the target executor must be compatible with a list of the output - types of the source executors. For example: - - class Target(Executor): - @handler - def handle_messages(self, messages: list[Message]) -> None: - # Process the aggregated messages from all sources - - class Source(Executor): - @handler(output_type=[Message]) - def handle_message(self, message: Message) -> None: - # Send a message to the target executor - self.send_message(message) - - workflow = ( - WorkflowBuilder() - .add_fan_in_edges( - [Source(id="source1"), Source(id="source2")], - Target(id="target") - ) - .build() - ) + types of the source executors. Args: sources: A list of source executors for the edges. target: The target executor for the edges. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class Producer(Executor): + @handler + async def produce(self, seed: int, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(f"result_{seed}") + + + class Aggregator(Executor): + @handler + async def aggregate(self, results: list[str], ctx: WorkflowContext[Never, str]) -> None: + combined = ", ".join(results) + await ctx.yield_output(f"Combined: {combined}") + + + # Collect results from multiple producers + workflow = ( + WorkflowBuilder() + .add_fan_in_edges([Producer(id="prod_1"), Producer(id="prod_2")], Aggregator(id="agg")) + .set_start_executor("prod_1") + .build() + ) """ source_execs = [self._maybe_wrap_agent(s) for s in sources] target_exec = self._maybe_wrap_agent(target) @@ -342,6 +577,42 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol]) -> Self: Args: executors: A list of executors to be added to the chain. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class Step1(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class Step2(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text[::-1]) + + + class Step3(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(f"Final: {text}") + + + # Chain executors in sequence + workflow = ( + WorkflowBuilder() + .add_chain([Step1(id="step1"), Step2(id="step2"), Step3(id="step3")]) + .set_start_executor("step1") + .build() + ) """ # Wrap each candidate first to ensure stable IDs before adding edges wrapped: list[Executor] = [self._maybe_wrap_agent(e) for e in executors] @@ -352,8 +623,46 @@ def add_chain(self, executors: Sequence[Executor | AgentProtocol]) -> Self: def set_start_executor(self, executor: Executor | AgentProtocol | str) -> Self: """Set the starting executor for the workflow. + The start executor is the entry point for the workflow. When the workflow is executed, + the initial message will be sent to this executor. + Args: - executor: The starting executor, which can be an Executor instance or its ID. + executor: The starting executor, which can be an Executor instance, AgentProtocol instance, + or the string ID of an executor previously added to the workflow. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class EntryPoint(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class Processor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text) + + + # Set by executor instance + entry = EntryPoint(id="entry") + workflow = WorkflowBuilder().add_edge(entry, Processor(id="proc")).set_start_executor(entry).build() + + # Set by executor ID string + workflow = ( + WorkflowBuilder() + .add_edge(EntryPoint(id="entry"), Processor(id="proc")) + .set_start_executor("entry") + .build() + ) """ if isinstance(executor, str): self._start_executor = executor @@ -370,8 +679,43 @@ def set_start_executor(self, executor: Executor | AgentProtocol | str) -> Self: def set_max_iterations(self, max_iterations: int) -> Self: """Set the maximum number of iterations for the workflow. + When a workflow contains cycles, this limit prevents infinite loops by capping + the total number of executor invocations. The default is 100 iterations. + Args: max_iterations: The maximum number of iterations the workflow will run for convergence. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class StepA(Executor): + @handler + async def process(self, count: int, ctx: WorkflowContext[int]) -> None: + if count < 10: + await ctx.send_message(count + 1) + + + class StepB(Executor): + @handler + async def process(self, count: int, ctx: WorkflowContext[int]) -> None: + await ctx.send_message(count) + + + # Set a custom iteration limit for workflow with cycles + workflow = ( + WorkflowBuilder() + .set_max_iterations(500) + .add_edge(StepA(id="step_a"), StepB(id="step_b")) + .add_edge(StepB(id="step_b"), StepA(id="step_a")) # Cycle + .set_start_executor("step_a") + .build() + ) """ self._max_iterations = max_iterations return self @@ -381,8 +725,48 @@ def set_max_iterations(self, max_iterations: int) -> Self: def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> Self: """Enable checkpointing with the specified storage. + Checkpointing allows workflows to save their state periodically, enabling + pause/resume functionality and recovery from failures. The checkpoint storage + implementation determines where checkpoints are persisted. + Args: - checkpoint_storage: The checkpoint storage to use. + checkpoint_storage: The checkpoint storage implementation to use. + + Returns: + Self: The WorkflowBuilder instance for method chaining. + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + from agent_framework import FileCheckpointStorage + + + class ProcessorA(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + + class ProcessorB(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text) + + + # Enable checkpointing with file-based storage + storage = FileCheckpointStorage("./checkpoints") + workflow = ( + WorkflowBuilder() + .add_edge(ProcessorA(id="proc_a"), ProcessorB(id="proc_b")) + .set_start_executor("proc_a") + .with_checkpointing(storage) + .build() + ) + + # Run with checkpoint saving + events = await workflow.run("input") """ self._checkpoint_storage = checkpoint_storage return self @@ -390,15 +774,43 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> Self: def build(self) -> Workflow: """Build and return the constructed workflow. - This method performs validation before building the workflow. + This method performs validation before building the workflow to ensure: + - A starting executor has been set + - All edges connect valid executors + - The graph is properly connected + - Type compatibility between connected executors Returns: - A Workflow instance with the defined edges and starting executor. + Workflow: An immutable Workflow instance ready for execution. Raises: ValueError: If starting executor is not set. WorkflowValidationError: If workflow validation fails (includes EdgeDuplicationError, TypeCompatibilityError, and GraphConnectivityError subclasses). + + Example: + .. code-block:: python + + from typing_extensions import Never + from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler + + + class MyExecutor(Executor): + @handler + async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None: + await ctx.yield_output(text.upper()) + + + # Build and execute a workflow + workflow = WorkflowBuilder().set_start_executor(MyExecutor(id="executor")).build() + + # The workflow is now immutable and ready to run + events = await workflow.run("hello") + print(events.get_outputs()) # ['HELLO'] + + # Workflows can be reused multiple times + events2 = await workflow.run("world") + print(events2.get_outputs()) # ['WORLD'] """ # Create workflow build span that includes validation and workflow creation with create_workflow_span(OtelAttr.WORKFLOW_BUILD_SPAN) as span: