Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3a7f6a0
feat(agentserver): Add durable task framework to azure-ai-agentserver…
RaviPidaparthi May 12, 2026
42b5f54
docs: Mark completed backlog items 6 and 9 as done (spec 006)
RaviPidaparthi May 12, 2026
d368c13
Merge remote-tracking branch 'origin/main' into feature/agentserver-d…
RaviPidaparthi May 12, 2026
31127e0
feat(agentserver): spec 007 dict metadata, cspell fixes, release prep
RaviPidaparthi May 12, 2026
8b4c734
fix: remove speckit artifacts that were force-added past .gitignore
RaviPidaparthi May 12, 2026
27cb466
docs: improve durable task guide overview with clear value proposition
RaviPidaparthi May 12, 2026
0447f1d
docs: simplify overview to developer contract and benefits
RaviPidaparthi May 12, 2026
d593300
docs: add 'what durable tasks are NOT' section, fix metadata guidance
RaviPidaparthi May 12, 2026
d49596c
docs: fix misleading crash recovery language — recovery is automatic
RaviPidaparthi May 12, 2026
7977ad4
docs: fix duplicate run behavior — raises TaskConflictError, not joins
RaviPidaparthi May 12, 2026
8a37e75
docs: fix inaccuracies in durable task developer guide
RaviPidaparthi May 12, 2026
0efb5a7
fix(agentserver-core): resolve sphinx, mypy, pylint CI failures
RaviPidaparthi May 13, 2026
c53190b
feat(agentserver): steering, task.list, reserved tags, recovery routi…
RaviPidaparthi May 13, 2026
d16723a
feat(agentserver): pluggable StreamHandler protocol for durable strea…
RaviPidaparthi May 14, 2026
977cebf
feat(agentserver): complete spec-009 remaining phases
RaviPidaparthi May 14, 2026
b7c55be
docs: add get_active_run to methods table in developer guide
RaviPidaparthi May 14, 2026
2f5b611
feat(agentserver): add stream_handler_factory for crash-recovery
RaviPidaparthi May 14, 2026
4919d03
Merge remote-tracking branch 'origin/main' into feature/agentserver-d…
RaviPidaparthi May 19, 2026
1a41460
fix(agentserver): resolve CI pipeline failures
RaviPidaparthi May 19, 2026
29338ee
fix(agentserver-core): auto-init DurableTaskManager + pin aiohttp<4.0…
RaviPidaparthi May 20, 2026
f7f5961
fix: use {endpoint}/tasks path for Task Storage API, disable integration
RaviPidaparthi May 20, 2026
4cf120a
refactor: migrate durable research agent to LangGraph + SQLite checkp…
RaviPidaparthi May 20, 2026
6d366a8
Revert "refactor: migrate durable research agent to LangGraph + SQLit…
RaviPidaparthi May 20, 2026
0d80832
chore: add .env deployment config for durable-agent-demo
RaviPidaparthi May 20, 2026
0a30e20
docs: hardcode actual endpoint in demo script
RaviPidaparthi May 20, 2026
59ed7bb
feat(durable-demo): POST fire-and-forget + SSE resume with last_event_id
RaviPidaparthi May 20, 2026
e493317
fix(demo-client): use azd ai agent monitor for container logs
RaviPidaparthi May 20, 2026
ceb03d0
docs(durable-demo): update README for fire-and-forget + last_event_id…
RaviPidaparthi May 20, 2026
58caaf8
fix(durable-demo): remove task_id from POST response (internal detail)
RaviPidaparthi May 20, 2026
eb6b6a0
chore(durable-demo): update .gitignore and azd state
RaviPidaparthi May 20, 2026
29c7394
perf(durable-demo): skip artificial delay on first stage for responsi…
RaviPidaparthi May 20, 2026
d316d58
fix(durable-demo): use --session-id flag (not --session) for azd monitor
RaviPidaparthi May 20, 2026
0175d1e
chore(durable-demo): add azd environment config
RaviPidaparthi May 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/agentserver/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
specs/
.specify/
.github/
.vscode/
36 changes: 36 additions & 0 deletions sdk/agentserver/azure-ai-agentserver-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,42 @@

### Features Added

- **Durable long-running agents** — New `@durable_task` decorator and supporting types for building crash-resilient, long-running agents that survive container crashes, OOM kills, and redeployments. Key capabilities:
- **Lifecycle automation** — `.run()` and `.start()` automatically start, resume, or recover tasks based on their current state in the task store.
- **Entry mode awareness** — `ctx.entry_mode` tells the function whether it was entered `"fresh"`, `"resumed"` from suspension, or `"recovered"` from a crash.
- **Suspend & resume** — `ctx.suspend(output=..., reason=...)` pauses execution for multi-turn agent patterns (e.g., waiting for user input).
- **TaskResult wrapper** — `run()` and `result()` return `TaskResult[Output]` with `.is_completed` / `.is_suspended` properties, making suspension a normal return value instead of an exception.
- **Streaming** — `ctx.stream(chunk)` emits incremental output; consumers iterate with `async for chunk in task_run`.
- **Cancellation & timeout** — Cooperative cancel via `ctx.cancel` event, configurable `timeout`, and `terminate()` for forced shutdown.
- **RetryPolicy** — Configurable retry with factory presets: `.exponential_backoff()`, `.fixed_delay()`, `.linear_backoff()`, `.no_retry()`.
- **Source auto-stamping** — The framework automatically stamps every task with provenance metadata: `type` (`agentserver.durable_task`), `name` (the decorator `name` option — the stable identity anchor), and `server_version` (the `x-platform-server` header value). Source is framework-owned and not user-overridable. A reserved tag `_durable_task_name` is also auto-stamped for LIST API filtering by function name.
- **Callable factories** — `tags`, `title`, and `description` accept `Callable[[Input, task_id], T]` for dynamic metadata computed at task creation time.
- **TaskMetadata** — Dict-like mutable progress metadata (`ctx.metadata["key"] = value`) with debounced auto-flush to the task store. Supports `[]`, `in`, `for`, `len`, `del`, plus convenience methods `.increment()` and `.append()`.
- **Handle operations** — `TaskRun.metadata` for progress snapshot reads, `TaskRun.delete()` for task cleanup, `TaskRun.refresh()` for re-fetching state from the store, `TaskRun.lease_expiry_count` for monitoring ownership churn.
- **TaskContext.description** — `ctx.description` exposes the task description string within the running function.
- **Configurable shutdown grace** — `DurableTaskManager(shutdown_grace_seconds=25.0)` controls how long the manager waits for tasks to checkpoint before force-expiring leases during shutdown.
- **Task listing** — `my_task.list(status=...)` returns all tasks for a specific durable task function, automatically scoped by function name (via tag) and source type. Supports `status` and `session_id` filters.
- **Steerable durable tasks** — New `steerable=True` parameter on `@durable_task` enables mid-flight steering where new inputs can be queued while a task is still running. Key capabilities:
- **Input queue** — `start()` on an in-progress steerable task queues the new input and returns a `TaskRun` handle immediately, instead of raising `TaskConflictError`.
- **Cancel signal** — `ctx.cancel` is automatically set when new inputs arrive, giving the function a cooperative signal to short-circuit.
- **Automatic drain** — The framework drains the queue after the function suspends or completes, re-entering with the next queued input using `entry_mode="resumed"` and `was_steered=True`.
- **Superseded results** — Previous generation's `TaskRun.result()` resolves with `status="superseded"` and `is_superseded=True`.
- **Context enrichment** — `ctx.was_steered`, `ctx.previous_input`, `ctx.pending_inputs`, and `ctx.generation` provide full steering context.
- **Queue limits** — `max_pending` (default 10) prevents unbounded queue growth; raises `SteeringQueueFull` when exceeded.
- **Crash recovery** — `drain_in_progress` flag in persisted state enables recovery from mid-drain crashes.
- **Distributed steering** — Lease renewal loop polls for pending inputs from other processes and sets `ctx.cancel` accordingly.
- **Etag-aware completion** — Steerable tasks use optimistic concurrency on completion to detect concurrent steering.

### Breaking Changes

- **`source` parameter removed** — The `source` keyword argument has been removed from `@durable_task()`, `.run()`, `.start()`, and `.options()`. Source provenance is now auto-stamped by the framework and cannot be overridden by developers. Use `tags` for custom metadata.

### Bugs Fixed

- **Local provider payload merge** — Fixed `_local_provider.py` to use strict shallow merge per Protocol Spec §11: root-level keys are now always replaced, not recursively merged. Previously nested dicts were merged with `dict.update()`, which was more forgiving than the real Task Storage API.
- **Task recovery routing** — `_find_resume_callback()` now matches by `source.name` (the auto-stamped function name) first, then falls back to title prefix match. Previously relied only on fragile title prefix heuristic.

### Other Changes
- Added `_platform_headers` module with cross-cutting protocol header name constants (`x-request-id`, `x-platform-server`, `x-agent-session-id`, `x-platform-error-source`, `x-platform-error-detail`, and others). Protocol packages now import shared header name strings from core instead of maintaining their own copies.
- Added `TraceContextMiddleware` — a lightweight pure-ASGI middleware that propagates W3C trace context (`traceparent`, `tracestate`) and baggage from incoming HTTP requests. Any spans created by downstream frameworks (e.g. MAF / agent-framework) are automatically children of the caller's trace without additional framework spans.
- Added `enable_sensitive_data` parameter to `configure_observability()` to control whether prompts, tool arguments, and results are recorded in telemetry. Respects `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable.
Expand Down
49 changes: 49 additions & 0 deletions sdk/agentserver/azure-ai-agentserver-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,54 @@ export APPLICATIONINSIGHTS_CONNECTION_STRING="InstrumentationKey=..."
python my_agent.py
```

### Durable long-running agents

The `@durable_task` decorator builds crash-resilient agents that survive container restarts, OOM kills, and redeployments. Task state is persisted to a task store, enabling automatic recovery and multi-turn suspend/resume patterns.

```python
from datetime import timedelta
from azure.ai.agentserver.core.durable import durable_task, TaskContext, RetryPolicy

@durable_task(
timeout=timedelta(minutes=30),
retry=RetryPolicy.exponential_backoff(max_attempts=3),
tags={"priority": "high"},
)
async def process_document(ctx: TaskContext[dict]) -> dict:
ctx.metadata["phase"] = "processing"
result = await analyze(ctx.input["document_url"])
ctx.metadata["phase"] = "complete"
return {"summary": result}
```

**Start and await a task:**

```python
result = await process_document.run(task_id="doc-42", input={"document_url": "..."})
print(result.output) # {"summary": "..."}
```

**Multi-turn suspend/resume (e.g., conversational agents):**

```python
@durable_task()
async def chat_session(ctx: TaskContext[dict]) -> dict:
message = ctx.input["message"]
history = ctx.metadata.get("history", [])
reply = await generate_reply(message, history)
history.append({"role": "user", "content": message})
history.append({"role": "assistant", "content": reply})
ctx.metadata["history"] = history
return await ctx.suspend(output={"reply": reply})

# Each call resumes the same session:
result = await chat_session.run(task_id="session-1", input={"message": "Hello"})
print(result.output) # {"reply": "Hi! How can I help?"}
print(result.is_suspended) # True
```

See the [Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-developer-guide.md) for the full API reference.

## Troubleshooting

### Logging
Expand All @@ -130,6 +178,7 @@ To report an issue with the client library, or request additional features, plea
## Next steps

- Install [`azure-ai-agentserver-invocations`](https://pypi.org/project/azure-ai-agentserver-invocations/) to add the invocation protocol endpoints.
- Read the [Durable Task Developer Guide](https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/agentserver/azure-ai-agentserver-core/docs/durable-task-developer-guide.md) for crash-resilient long-running agents.
- See the [container image spec](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver) for the full hosted agent contract.

## Contributing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
trace_stream,
)
"""

__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from ._base import AgentServerHost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,47 @@ async def _lifespan(_app: Starlette) -> AsyncGenerator[None, None]: # noqa: RUF
protocols,
)

# --- Durable task manager auto-initialization ---
durable_manager = None
try:
from .durable._manager import ( # pylint: disable=import-outside-toplevel
DurableTaskManager,
set_task_manager,
)

durable_manager = DurableTaskManager(
config=cfg,
shutdown_event=asyncio.Event(),
)
set_task_manager(durable_manager)
await durable_manager.startup()
logger.info("DurableTaskManager initialized automatically")
except ImportError:
pass # durable module not available
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Failed to initialize DurableTaskManager", exc_info=True)

yield

# --- SHUTDOWN: runs once when the server is stopping ---
logger.info(
"AgentServerHost shutting down (graceful timeout=%ss)",
self._graceful_shutdown_timeout,
)

# Shutdown durable task manager
if durable_manager is not None:
try:
await durable_manager.shutdown()
from .durable._manager import ( # pylint: disable=import-outside-toplevel
set_task_manager as _clear_manager,
)

_clear_manager(None)
logger.info("DurableTaskManager shut down")
except Exception: # pylint: disable=broad-exception-caught
logger.warning("Error shutting down DurableTaskManager", exc_info=True)

if self._graceful_shutdown_timeout == 0:
logger.info("Graceful shutdown drain period disabled (timeout=0)")
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def from_env(cls) -> Self:
session_id=os.environ.get(_ENV_FOUNDRY_AGENT_SESSION_ID, ""),
port=resolve_port(None),
appinsights_connection_string=os.environ.get(
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING, ""),
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING, ""
),
otlp_endpoint=os.environ.get(_ENV_OTEL_EXPORTER_OTLP_ENDPOINT, ""),
sse_keepalive_interval=resolve_sse_keepalive_interval(None),
ws_ping_interval=resolve_ws_ping_interval(),
Expand Down Expand Up @@ -168,9 +169,7 @@ def _require_int(name: str, value: object) -> int:
:raises ValueError: If *value* is not an integer.
"""
if isinstance(value, bool) or not isinstance(value, int):
raise ValueError(
f"Invalid value for {name}: {value!r} (expected an integer)"
)
raise ValueError(f"Invalid value for {name}: {value!r} (expected an integer)")
return value


Expand All @@ -186,9 +185,7 @@ def _validate_port(value: int, source: str) -> int:
:raises ValueError: If the port is outside 1-65535.
"""
if not 1 <= value <= 65535:
raise ValueError(
f"Invalid value for {source}: {value} (expected 1-65535)"
)
raise ValueError(f"Invalid value for {source}: {value} (expected 1-65535)")
return value


Expand Down Expand Up @@ -249,9 +246,7 @@ def resolve_appinsights_connection_string(
"""
if connection_string is not None:
return connection_string
return os.environ.get(
_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING
)
return os.environ.get(_ENV_APPLICATIONINSIGHTS_CONNECTION_STRING)


def resolve_log_level(level: Optional[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,4 @@ def create_error_response(
body["type"] = error_type
if details is not None:
body["details"] = details
return JSONResponse(
{"error": body}, status_code=status_code, headers=headers
)
return JSONResponse({"error": body}, status_code=status_code, headers=headers)
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ def _get_trace_id(headers: list[tuple[bytes, bytes]] | None = None) -> str | Non
:rtype: str | None
"""
try:
from opentelemetry import trace as _trace # pylint: disable=import-outside-toplevel
from opentelemetry import (
trace as _trace,
) # pylint: disable=import-outside-toplevel

span = _trace.get_current_span()
ctx = span.get_span_context()
Expand Down Expand Up @@ -147,7 +149,10 @@ async def _send_wrapper(message: MutableMapping[str, Any]) -> None:
elapsed_ms = (time.monotonic() - start) * 1000
logger.warning(
"Inbound %s %s failed with status 500 in %.1fms%s",
method, path, elapsed_ms, extra_str,
method,
path,
elapsed_ms,
extra_str,
)
raise

Expand All @@ -156,10 +161,18 @@ async def _send_wrapper(message: MutableMapping[str, Any]) -> None:
if status_code is not None and status_code >= 400:
logger.warning(
"Inbound %s %s completed with status %d in %.1fms%s",
method, path, status_code, elapsed_ms, extra_str,
method,
path,
status_code,
elapsed_ms,
extra_str,
)
else:
logger.info(
"Inbound %s %s completed with status %s in %.1fms%s",
method, path, status_code, elapsed_ms, extra_str,
method,
path,
status_code,
elapsed_ms,
extra_str,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# ---------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# ---------------------------------------------------------
"""Durable task subsystem for crash-resilient long-running agents.

Provides the :func:`durable_task` decorator and supporting types for
building Azure AI Hosted Agents that survive container crashes,
OOM kills, and redeployments.

Key features:

- **Lifecycle automation** — ``.run()`` and ``.start()`` automatically
start, resume, or recover tasks based on their current state.
- **Entry mode** — ``ctx.entry_mode`` tells the function whether it was
entered fresh, resumed from suspension, or recovered from a crash.
- **RetryPolicy** — configurable retry with exponential, fixed, or linear
backoff (see :class:`RetryPolicy` presets).
- **Streaming** — emit incremental output via ``ctx.stream()`` and consume
with ``async for chunk in task_run``.
- **Source tracking** — attach immutable provenance metadata at task
creation time via the ``source`` parameter.

Public API::

from azure.ai.agentserver.core.durable import (
durable_task,
DurableTask,
RetryPolicy,
TaskContext,
TaskMetadata,
TaskResult,
TaskRun,
Suspended,
TaskStatus,
TaskFailed,
TaskSuspended,
TaskCancelled,
TaskNotFound,
TaskConflictError,
TaskTerminated,
EntryMode,
TaskInfo,
)
"""

from ._context import EntryMode, TaskContext
from ._decorator import DurableTask, DurableTaskOptions, durable_task
from ._exceptions import (
EtagConflict,
SteeringQueueFull,
TaskCancelled,
TaskConflictError,
TaskFailed,
TaskNotFound,
TaskSuspended,
TaskTerminated,
)
from ._metadata import TaskMetadata
from ._models import TaskInfo, TaskStatus
from ._result import TaskResult
from ._retry import RetryPolicy
from ._run import Suspended, TaskRun
from ._stream import QueueStreamHandler, StreamHandler, StreamHandlerFactory

__all__ = [
"durable_task",
"DurableTask",
"DurableTaskOptions",
"QueueStreamHandler",
"RetryPolicy",
"StreamHandler",
"StreamHandlerFactory",
"TaskContext",
"TaskMetadata",
"TaskResult",
"TaskRun",
"Suspended",
"TaskStatus",
"TaskFailed",
"TaskSuspended",
"TaskCancelled",
"TaskNotFound",
"TaskConflictError",
"TaskTerminated",
"EtagConflict",
"SteeringQueueFull",
"EntryMode",
"TaskInfo",
]
Loading
Loading