From e9c32ec1da007df851825ba4687e5048fee7db94 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 21 May 2026 14:53:35 -0700 Subject: [PATCH 1/3] Add Strands Agents integration guide Adds a Python integration guide for Strands Agents and surfaces it in the integrations index, sidebar, and /develop/python landing page. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/develop/python/index.mdx | 1 + docs/develop/python/integrations/index.mdx | 1 + .../python/integrations/strands-agents.mdx | 592 ++++++++++++++++++ sidebars.js | 1 + 4 files changed, 595 insertions(+) create mode 100644 docs/develop/python/integrations/strands-agents.mdx diff --git a/docs/develop/python/index.mdx b/docs/develop/python/index.mdx index acad5e2c13..1bcb48c861 100644 --- a/docs/develop/python/index.mdx +++ b/docs/develop/python/index.mdx @@ -81,6 +81,7 @@ From there, you can dive deeper into any of the Temporal primitives to start bui ## [Integrations](/develop/python/integrations) - [Braintrust integration](/develop/python/integrations/braintrust) +- [Strands Agents integration](/develop/python/integrations/strands-agents) ## Temporal Python Technical Resources diff --git a/docs/develop/python/integrations/index.mdx b/docs/develop/python/integrations/index.mdx index e0f3fb8d8a..8bdad96bd4 100644 --- a/docs/develop/python/integrations/index.mdx +++ b/docs/develop/python/integrations/index.mdx @@ -28,6 +28,7 @@ The following integrations are available between the Temporal Python SDK and thi | LangSmith | Observability | [smith.langchain.com](https://docs.smith.langchain.com/) | [Guide](./langsmith.mdx) | | OpenAI Agents SDK | Agent framework | [openai.github.io](https://openai.github.io/openai-agents-python/) | [Guide](https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/openai_agents/README.md) | | Pydantic AI | Agent framework | [ai.pydantic.dev](https://ai.pydantic.dev/) | [Guide](https://ai.pydantic.dev/durable_execution/temporal/) | +| Strands Agents | Agent framework | [strandsagents.com](https://strandsagents.com/) | [Guide](./strands-agents.mdx) | | Tenuo | Governance | [tenuo.ai](https://tenuo.ai/docs) | [Guide](https://tenuo.ai/temporal) | These integrations are built on the Temporal Python SDK's [Plugin system](/develop/plugins-guide), which you can also diff --git a/docs/develop/python/integrations/strands-agents.mdx b/docs/develop/python/integrations/strands-agents.mdx new file mode 100644 index 0000000000..788b99ca90 --- /dev/null +++ b/docs/develop/python/integrations/strands-agents.mdx @@ -0,0 +1,592 @@ +--- +id: strands-agents +title: Strands Agents integration +sidebar_label: Strands Agents +toc_max_heading_level: 2 +keywords: + - ai + - agents + - strands + - strands agents + - durable execution + - ai workflows +tags: + - Strands Agents + - Python SDK + - Temporal SDKs +description: + Run Strands Agents AI workflows with durable execution using the Temporal Python SDK and Strands plugin. +--- + +Temporal's integration with [Strands Agents](https://strandsagents.com/) gives your Strands agents durable execution, +automatic retries, and timeouts via the Temporal platform. The plugin routes Strands model invocations, tool calls, MCP +tool calls, and hooks through Temporal Activities, so every step the agent takes is recorded in workflow history. + +:::info + +The Temporal Python SDK integration with Strands Agents is currently at an experimental release stage. The API may +change in future versions. + +::: + +Code snippets in this guide are taken from the +[Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin). Refer to the +samples for the complete code. + +## Prerequisites + +- This guide assumes you are already familiar with Strands Agents. If you aren't, refer to the + [Strands Agents documentation](https://strandsagents.com/) for more details. +- If you are new to Temporal, we recommend reading [Understanding Temporal](/evaluate/understanding-temporal) or taking + the [Temporal 101](https://learn.temporal.io/courses/temporal_101/) course. +- Ensure you have set up your local development environment by following the + [Set up your local development environment](/develop/python/set-up-your-local-python) guide. When you're done, leave + the Temporal development server running if you want to test your code locally. + +## Install the plugin + +Install the Temporal Python SDK with Strands Agents support (requires `temporalio` 1.28.0 or later): + +```bash +uv add "temporalio[strands-agents]" +``` + +or with pip: + +```bash +pip install "temporalio[strands-agents]" +``` + +## Quickstart + +Define a Workflow that holds a `TemporalAgent`, then register `StrandsPlugin` on the Worker: + +```python +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.client import Client +from temporalio.contrib.strands import StrandsPlugin, TemporalAgent +from temporalio.worker import Worker + + +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60)) + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + return str(result) + + +async def main() -> None: + client = await Client.connect("localhost:7233") + worker = Worker( + client, + task_queue="strands", + workflows=[MyWorkflow], + plugins=[StrandsPlugin()], + ) + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +Start the Workflow from a client: + +```python +import asyncio + +from temporalio.client import Client + +from workflow import MyWorkflow + + +async def main() -> None: + client = await Client.connect("localhost:7233") + result = await client.execute_workflow( + MyWorkflow.run, + "Hello", + id="strands-quickstart", + task_queue="strands", + ) + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) +``` + +:::warning + +Inside a Workflow, always call `agent.invoke_async(message)` — not `agent(message)`. The synchronous form spawns a +worker thread, which the Workflow sandbox blocks. + +::: + +## Models + +`StrandsPlugin(models=...)` takes a mapping of `name → factory`. Each factory is called lazily on first use (on the +Worker, outside the Workflow sandbox) and the constructed model is cached for the Worker's lifetime. If `models` is +omitted, the plugin registers a single `BedrockModel()` factory under the name `"bedrock"`, matching Strands' own +implicit default. Select a model per agent with `TemporalAgent(model="name", ...)`: + +```python +from strands.models.anthropic import AnthropicModel +from strands.models.bedrock import BedrockModel + +# Workflow +@workflow.defn +class MultiModelWorkflow: + def __init__(self) -> None: + self.agent_a = TemporalAgent( + model="claude", + start_to_close_timeout=timedelta(seconds=60), + ) + self.agent_b = TemporalAgent( + model="bedrock", + start_to_close_timeout=timedelta(seconds=60), + ) + +# Worker +Worker(..., plugins=[StrandsPlugin(models={ + "claude": lambda: AnthropicModel(client_args={"api_key": "..."}), + "bedrock": lambda: BedrockModel(), +})]) +``` + +Each `TemporalAgent` carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and +dispatches to the shared model Activity, which resolves the model name against the registered factories at runtime. A +name not present in `models` raises `ValueError` inside the Activity. + +## Tools + +Wrap non-deterministic tools as Temporal Activities, register them with the Worker, and pass them to the agent through +`workflow.activity_as_tool`: + +```python +from strands_tools import shell +from temporalio import activity +from temporalio.contrib.strands import StrandsPlugin, TemporalAgent +from temporalio.contrib.strands import workflow as strands_workflow + + +@activity.defn +async def fetch_user(user_id: str) -> dict: + ... + + +@activity.defn(name="shell") +async def shell_activity(command: str) -> dict: + return shell.shell(command=command, non_interactive=True) + + +# Workflow +agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[ + strands_workflow.activity_as_tool(fetch_user, start_to_close_timeout=timedelta(seconds=30)), + strands_workflow.activity_as_tool(shell_activity, start_to_close_timeout=timedelta(seconds=15)), + ], +) + +# Worker +Worker( + ..., + activities=[fetch_user, shell_activity], + plugins=[StrandsPlugin()], +) +``` + +If you're using built-in `strands_tools`, wrap them in a thin async function decorated with `@activity.defn` so they +run as Temporal Activities. + +## Hooks + +Strands' [hook system](https://strandsagents.com/) lets you subscribe callbacks to events in the agent lifecycle — +invocation start and end, model call before and after, tool call before and after, and message added. Pass +`hooks=[MyHookProvider()]` to `TemporalAgent`; single-agent hook events fire in Workflow context, so deterministic +callbacks just work: + +```python +from strands.hooks import HookProvider, HookRegistry +from strands.hooks.events import AfterToolCallEvent +from temporalio import workflow + + +class AuditHook(HookProvider): + def register_hooks(self, registry: HookRegistry) -> None: + registry.add_callback(AfterToolCallEvent, self._on_tool_call) + + def _on_tool_call(self, event: AfterToolCallEvent) -> None: + workflow.logger.info(f"tool {event.tool_use['name']} finished") + + +agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60), hooks=[AuditHook()]) +``` + +:::warning + +Hook callbacks run in Workflow context, so they must be +[deterministic](/develop/python/workflows/basics#workflow-logic-requirements) — no `time.time()`, `uuid.uuid4()`, or +I/O. For callbacks that need I/O (audit logging, metrics, alerting), use `workflow.activity_as_hook()` to dispatch the +work as a Temporal Activity: + +::: + +```python +from temporalio import activity +from temporalio.contrib.strands.workflow import activity_as_hook + + +@activity.defn +async def persist_tool_call(tool_name: str) -> None: + # I/O safely in an activity. + ... + + +class AuditHook(HookProvider): + def register_hooks(self, registry: HookRegistry) -> None: + registry.add_callback( + AfterToolCallEvent, + activity_as_hook( + persist_tool_call, + activity_input=lambda event: event.tool_use["name"], + start_to_close_timeout=timedelta(seconds=10), + ), + ) +``` + +`activity_input` extracts serializable values from the event to pass as the Activity's input. Use a dataclass or +Pydantic model for multiple values. This is needed because hook events hold references to the `Agent`, `AgentTool` +instances, and other objects that don't cross the Activity boundary. + +## Human-in-the-loop interrupts + +Strands offers two human-in-the-loop surfaces; both work with the plugin. In each case, `agent.invoke_async()` returns +`AgentResult(stop_reason="interrupt", interrupts=[...])` instead of raising. Pair this with a Signal handler that +supplies responses, then resume by calling `agent.invoke_async(responses)`. + +### Hook-based interrupts + +A hook on an interruptible event (for example, `BeforeToolCallEvent`) can pause the agent by calling +`event.interrupt(name, reason=...)`. The hook runs in Workflow context, so it must be deterministic — no I/O. + +```python +from strands.hooks import HookProvider, HookRegistry +from strands.hooks.events import BeforeToolCallEvent +from temporalio import workflow + + +class ApprovalHook(HookProvider): + def register_hooks(self, registry: HookRegistry) -> None: + registry.add_callback(BeforeToolCallEvent, self._gate) + + def _gate(self, event: BeforeToolCallEvent) -> None: + if event.interrupt("approval", reason="confirm delete") != "approve": + event.cancel_tool = "denied" + + +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[delete_thing], + hooks=[ApprovalHook()], + ) + self._approval: str | None = None + + @workflow.signal + def approve(self, response: str) -> None: + self._approval = response + + @workflow.run + async def run(self, prompt: str) -> str: + result = await self.agent.invoke_async(prompt) + if result.stop_reason == "interrupt": + await workflow.wait_condition(lambda: self._approval is not None) + result = await self.agent.invoke_async([ + {"interruptResponse": {"interruptId": result.interrupts[0].id, "response": self._approval}} + ]) + return str(result) +``` + +### Tool-body interrupts + +A `@strands.tool` function can raise `InterruptException(Interrupt(...))` directly. The agent stops with the interrupt, +and the Workflow handles the resume the same way as for hooks: + +```python +from strands import tool +from strands.interrupt import Interrupt, InterruptException + + +@tool +def delete_thing(name: str) -> str: + raise InterruptException( + Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?") + ) +``` + +The same works from an `activity_as_tool`-wrapped Activity. The plugin's failure converter preserves the `Interrupt` +payload across the Activity boundary, so `AgentResult.interrupts` is populated just like the in-Workflow case: + +```python +from strands.interrupt import Interrupt, InterruptException +from temporalio import activity +from temporalio.contrib.strands.workflow import activity_as_tool + + +@activity.defn +async def delete_thing(name: str) -> str: + if not await policy.is_authorized(name): + raise InterruptException( + Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?") + ) + await storage.delete(name) + return f"deleted {name}" + + +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[activity_as_tool(delete_thing, start_to_close_timeout=timedelta(seconds=10))], + ) +``` + +:::warning + +Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter. +Attach `StrandsPlugin` to the **client** (not just the Worker) for them to work — Workers built from that client pick +up the plugin automatically: + +```python +client = await Client.connect("localhost:7233", plugins=[StrandsPlugin()]) +Worker(client, task_queue="strands", workflows=[MyWorkflow], activities=[delete_thing]) +``` + +::: + +## Structured output + +Like Strands' `Agent`, `TemporalAgent` supports structured output with `structured_output_model`. The plugin defaults +to the [`pydantic_data_converter`](/develop/python/best-practices/data-handling/data-conversion), so Pydantic types +serialize cleanly across the Activity and Workflow boundary: + +```python +from pydantic import BaseModel + + +class PersonInfo(BaseModel): + name: str + age: int + + +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + structured_output_model=PersonInfo, + ) + + @workflow.run + async def run(self, prompt: str) -> PersonInfo: + result = await self.agent.invoke_async(prompt) + return result.structured_output +``` + +## Streaming + +To forward model chunks to external consumers, pass `streaming_topic="..."` to `TemporalAgent` and host a +`WorkflowStream` on the Workflow. Each `StreamEvent` is published on the named topic from inside the model Activity; +subscribers read via `WorkflowStreamClient`. Chunks are batched on `streaming_batch_interval` (default 100ms): + +```python +from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient + + +# Workflow +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + self.stream = WorkflowStream() + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + streaming_topic="events", + ) + + +# Client +async for item in WorkflowStreamClient.create(client, workflow_id).subscribe( + ["events"], result_type=StreamEvent, +): + print(item.data) +``` + +## MCP + +`StrandsPlugin(mcp_clients=...)` takes a mapping of `name → MCPClient factory`, mirroring the `models=` pattern. The +plugin registers a per-server `{name}-call-tool` Activity and connects at Worker startup to enumerate tools. +Workflow-side, `TemporalMCPClient(server="name")` is a pure handle: it references the server by name and carries the +per-call Activity options. + +```python +from datetime import timedelta + +from mcp import StdioServerParameters, stdio_client +from strands.tools.mcp.mcp_client import MCPClient +from temporalio import workflow +from temporalio.contrib.strands import StrandsPlugin, TemporalAgent, TemporalMCPClient + + +# Workflow +@workflow.defn +class MyWorkflow: + def __init__(self) -> None: + echo = TemporalMCPClient(server="echo", start_to_close_timeout=timedelta(seconds=30)) + self.agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + tools=[echo], + ) + + +# Worker +Worker( + ..., + plugins=[StrandsPlugin( + mcp_clients={ + "echo": lambda: MCPClient( + lambda: stdio_client( + StdioServerParameters(command="...", args=[...]), + ), + ), + }, + )], +) +``` + +Each factory returns a fully configured `MCPClient`, so you can pass options like `tool_filters`, `prefix`, +`elicitation_callback`, or `tasks_config` to it. + +:::note + +The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the +Worker's lifetime; restart Workers to pick up MCP-server changes. If a server is unavailable at startup, the Worker +fails to start. + +::: + +## Retries + +`TemporalAgent` disables Strands' built-in `ModelRetryStrategy` so retries are handled exclusively by Temporal. +Configure retries via `retry_policy` on `TemporalAgent`, and on the Activity options accepted by +`workflow.activity_as_tool`, `workflow.activity_as_hook`, and `TemporalMCPClient`: + +```python +from temporalio.common import RetryPolicy + + +TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy(maximum_attempts=3), +) +``` + +Passing `retry_strategy=...` to `TemporalAgent(...)` raises `ValueError`; remove the argument (or pass +`retry_strategy=None`) and put the retry config on the Activity options instead. + +## Continue-as-new + +A chat-style Workflow accumulates message history with every turn and eventually hits Temporal's per-Workflow history +limit. Use [continue-as-new](/develop/python/workflows/continue-as-new) to start a fresh execution while carrying +`agent.messages` forward as input: + +```python +from dataclasses import dataclass, field + +from strands.types.content import Messages +from temporalio import workflow + + +@dataclass +class ChatInput: + messages: Messages = field(default_factory=list) + + +@workflow.defn +class ChatWorkflow: + def __init__(self) -> None: + self._pending: list[str] = [] + self._done = False + + @workflow.signal + def user_says(self, prompt: str) -> None: + self._pending.append(prompt) + + @workflow.signal + def end_chat(self) -> None: + self._done = True + + @workflow.run + async def run(self, input: ChatInput) -> None: + agent = TemporalAgent( + start_to_close_timeout=timedelta(seconds=60), + messages=list(input.messages), + ) + while True: + await workflow.wait_condition(lambda: self._pending or self._done) + if self._done: + return + await agent.invoke_async(self._pending.pop(0)) + if workflow.info().is_continue_as_new_suggested(): + workflow.continue_as_new(ChatInput(messages=agent.messages)) +``` + +## Observability + +`StrandsPlugin` composes cleanly with the [OpenTelemetry plugin](/develop/python/observability#tracing). Register +`OpenTelemetryPlugin` on the client (Workers built from that client pick it up automatically) and `StrandsPlugin` on +the Worker. You'll get OTel spans around the model, tool, and MCP Activities the plugin schedules, plus any spans +Strands itself emits inside `invoke_async`: + +```python +import opentelemetry.trace +from temporalio.client import Client +from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider +from temporalio.contrib.strands import StrandsPlugin +from temporalio.worker import Worker + + +opentelemetry.trace.set_tracer_provider(create_tracer_provider()) + +client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()]) + +Worker( + client, + task_queue="strands", + workflows=[MyWorkflow], + plugins=[StrandsPlugin()], +) +``` + +Set the tracer provider before connecting the client. + +## Snapshots + +`TemporalAgent.take_snapshot()` and `TemporalAgent.load_snapshot()` raise `NotImplementedError`. Temporal's event +history already persists Workflow state durably at a finer granularity than Strands snapshots, so calling either +inside a Workflow is redundant. + +## Samples + +The [Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin) demonstrate +all supported patterns end-to-end. diff --git a/sidebars.js b/sidebars.js index e606b87475..8a9400b787 100644 --- a/sidebars.js +++ b/sidebars.js @@ -621,6 +621,7 @@ module.exports = { 'develop/python/integrations/braintrust', 'develop/python/integrations/langgraph', 'develop/python/integrations/langsmith', + 'develop/python/integrations/strands-agents', ], }, ], From 882a0c5de2ef22c698b5c9da1f3d01af5331e2a7 Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 21 May 2026 15:51:25 -0700 Subject: [PATCH 2/3] Fix broken links in Strands Agents integration guide Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/develop/python/integrations/strands-agents.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/develop/python/integrations/strands-agents.mdx b/docs/develop/python/integrations/strands-agents.mdx index 788b99ca90..920dc716de 100644 --- a/docs/develop/python/integrations/strands-agents.mdx +++ b/docs/develop/python/integrations/strands-agents.mdx @@ -378,7 +378,7 @@ Worker(client, task_queue="strands", workflows=[MyWorkflow], activities=[delete_ ## Structured output Like Strands' `Agent`, `TemporalAgent` supports structured output with `structured_output_model`. The plugin defaults -to the [`pydantic_data_converter`](/develop/python/best-practices/data-handling/data-conversion), so Pydantic types +to the [`pydantic_data_converter`](/develop/python/data-handling/data-conversion), so Pydantic types serialize cleanly across the Activity and Workflow boundary: ```python @@ -553,7 +553,7 @@ class ChatWorkflow: ## Observability -`StrandsPlugin` composes cleanly with the [OpenTelemetry plugin](/develop/python/observability#tracing). Register +`StrandsPlugin` composes cleanly with the [OpenTelemetry plugin](/develop/python/platform/observability#tracing). Register `OpenTelemetryPlugin` on the client (Workers built from that client pick it up automatically) and `StrandsPlugin` on the Worker. You'll get OTel spans around the model, tool, and MCP Activities the plugin schedules, plus any spans Strands itself emits inside `invoke_async`: From d36f6e7df6263f5e61e0f4117544cb1936b718bc Mon Sep 17 00:00:00 2001 From: Brian Strauch Date: Thu, 21 May 2026 15:56:24 -0700 Subject: [PATCH 3/3] Trigger Vercel build Co-Authored-By: Claude Opus 4.7 (1M context)