diff --git a/python/PACKAGE_STATUS.md b/python/PACKAGE_STATUS.md index 0cda8f4a1d..1de1ba3c81 100644 --- a/python/PACKAGE_STATUS.md +++ b/python/PACKAGE_STATUS.md @@ -35,6 +35,7 @@ Status is grouped into these buckets: | `agent-framework-gemini` | `python/packages/gemini` | `alpha` | | `agent-framework-github-copilot` | `python/packages/github_copilot` | `rc` | | `agent-framework-hosting` | `python/packages/hosting` | `alpha` | +| `agent-framework-hosting-activity-protocol` | `python/packages/hosting-activity-protocol` | `alpha` | | `agent-framework-hosting-responses` | `python/packages/hosting-responses` | `alpha` | | `agent-framework-hosting-telegram` | `python/packages/hosting-telegram` | `alpha` | | `agent-framework-hyperlight` | `python/packages/hyperlight` | `beta` | diff --git a/python/packages/hosting-activity-protocol/LICENSE b/python/packages/hosting-activity-protocol/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting-activity-protocol/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting-activity-protocol/README.md b/python/packages/hosting-activity-protocol/README.md new file mode 100644 index 0000000000..5069188c70 --- /dev/null +++ b/python/packages/hosting-activity-protocol/README.md @@ -0,0 +1,42 @@ +# agent-framework-hosting-activity-protocol + +Bot Framework **Activity Protocol** channel for +[agent-framework-hosting](../hosting). Connects to **Azure Bot Service** so +the same agent can be reached from Microsoft Teams, Slack, Webex, +Telegram-via-bot-channel, and any other channel Azure Bot Service +supports — without having to learn each channel's native protocol. + +> Looking for a deeper Microsoft Teams integration with adaptive cards, +> message extensions, dialogs, SSO, etc? That is intentionally separate from +> this Activity Protocol channel, which focuses on Azure Bot Service +> compatibility rather than Teams-specific affordances. + +Handles inbound `message` activities, outbound replies, mid-stream +`updateActivity` edits, typing indicators, and both client-secret and +certificate credential modes for the outbound Bot Framework token. + +## Usage + +```python +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_activity_protocol import ActivityProtocolChannel + +host = AgentFrameworkHost( + target=my_agent, + channels=[ + ActivityProtocolChannel( + app_id="", + app_password="", + tenant_id="botframework.com", # or your tenant id + ) + ], +) +host.serve() +``` + +For tenants that disallow client secrets, supply `certificate_path=` (and +optionally `certificate_password=`) instead. See the docstring at the top of +`_channel.py` for the openssl one-liner that generates a usable PEM. + +In dev mode (no credentials), the channel skips outbound auth so the Bot +Framework Emulator can hit the endpoint without setup. diff --git a/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/__init__.py b/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/__init__.py new file mode 100644 index 0000000000..4c205b4f04 --- /dev/null +++ b/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Bot Framework Activity Protocol channel for :mod:`agent_framework_hosting`.""" + +from ._channel import ActivityProtocolChannel, activity_protocol_isolation_key + +__all__ = ["ActivityProtocolChannel", "activity_protocol_isolation_key"] diff --git a/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/_channel.py b/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/_channel.py new file mode 100644 index 0000000000..7bbe851190 --- /dev/null +++ b/python/packages/hosting-activity-protocol/agent_framework_hosting_activity_protocol/_channel.py @@ -0,0 +1,1040 @@ +# Copyright (c) Microsoft. All rights reserved. + +r"""Built-in channel: Bot Framework Activity Protocol (Azure Bot Service). + +Activity Protocol is the Bot Framework messaging shape used by Azure Bot +Service to fan one bot endpoint out across many surfaces (Microsoft +Teams, Slack, Webex, Telegram, …). An incoming ``Activity`` is POSTed to +your bot's ``/messages`` endpoint, and you reply by POSTing one or more +``Activity`` objects back to the conversation URL the inbound activity +carried in ``serviceUrl``. Auth is an OAuth2 client-credentials token +from Entra (the legacy multi-tenant ``botframework.com`` authority for +public Bot Framework channels, or your own tenant for single-tenant +bots). + +This is the channel-neutral Activity-Protocol channel — it surfaces what +every Bot-Service-connected channel has in common (text in, text out). +For deeper Microsoft Teams affordances (adaptive cards, message +extensions, dialogs, SSO, …) on the same Bot Service transport, see the +companion ``agent-framework-hosting-teams`` package. + +This channel handles: + +- inbound ``message`` activities — text and attachments resolved to URIs, +- outbound replies via ``POST /v3/conversations/{id}/activities``, +- streaming via ``PUT /v3/conversations/{id}/activities/{id}`` mid-stream + edits on channels that support ``updateActivity`` (Teams personal chats + and groups); every other channel — Web Chat, Direct Line, the Emulator — + rejects the PUT with ``405``, so those buffer the stream and POST a + single final message instead, +- typing indicators while the agent works, +- per-conversation isolation key ``activity:`` so a Responses + caller can resume a Teams conversation by passing the conversation id, +- two credential modes for the outbound token — **client secret** or + **certificate** (for tenants that disallow secrets) — both via + ``azure.identity.aio``, +- dev-mode auth bypass when no credentials are passed so the Bot Framework + Emulator can hit the endpoint with no credentials. + +Out of scope for the prototype: full JWT validation of inbound requests, +adaptive cards, file uploads, OAuth sign-in flows, and the Teams streaming +preview API (``StreamItem``). + +Generating a certificate +------------------------ +For tenants that disallow client secrets, register a certificate on your +Bot Framework / Entra app instead. Self-signed PEM (private key + cert in +one file) is what ``azure.identity.CertificateCredential`` expects:: + + # 1. Generate a 2048-bit RSA key + self-signed cert (10y), single PEM. + openssl req -x509 -newkey rsa:2048 -nodes -days 3650 \\ + -subj "/CN=my-teams-bot" \\ + -keyout teams-bot.key -out teams-bot.crt + cat teams-bot.key teams-bot.crt > teams-bot.pem + + # 2. Upload teams-bot.crt to your Entra app under + # "Certificates & secrets" → "Certificates" → "Upload certificate". + + # 3. Point the channel at the combined PEM: + ActivityProtocolChannel( + app_id="", + tenant_id="", # or "botframework.com" for legacy bots + certificate_path="teams-bot.pem", + ) + +To encrypt the private key, drop ``-nodes`` from the openssl command and +pass ``certificate_password=`` to the channel. +""" + +from __future__ import annotations + +import asyncio +import time +from collections.abc import Awaitable, Callable, Mapping, Sequence +from typing import Any +from urllib.parse import urlparse + +import httpx +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + Content, + Message, + ResponseStream, +) +from agent_framework.exceptions import ContentError +from agent_framework_hosting import ( + ChannelCommand, + ChannelCommandContext, + ChannelContext, + ChannelContribution, + ChannelIdentity, + ChannelRequest, + ChannelResponseHook, + ChannelRunHook, + ChannelSession, + ChannelStreamUpdateHook, + logger, +) +from azure.core.credentials_async import AsyncTokenCredential +from azure.identity.aio import CertificateCredential, ClientSecretCredential +from starlette.requests import Request +from starlette.responses import JSONResponse, Response +from starlette.routing import Route + +# Bot Framework v4 multi-tenant authority used by the public Bot Framework +# channels (including Microsoft Teams). Single-tenant bots should override +# ``tenant_id`` with their own tenant. +_BOTFRAMEWORK_TENANT = "botframework.com" +_BOTFRAMEWORK_SCOPE = "https://api.botframework.com/.default" + +# Default allow-list of host suffixes the channel will POST a bearer token +# to. Bot Service surfaces ``serviceUrl`` per-conversation as one of these +# canonical hosts; a malicious inbound activity claiming a serviceUrl +# outside this set could otherwise exfiltrate a real Bot Framework access +# token. Operators with a private deployment (sovereign cloud, Direct Line +# only, etc.) override this via ``service_url_allowed_hosts``. +_DEFAULT_SERVICE_URL_HOSTS = ( + "botframework.com", + "smba.trafficmanager.net", +) + +# Bot Framework channels that support editing an Activity in place via +# ``PUT /v3/conversations/{id}/activities/{id}`` (the ``updateActivity`` +# REST operation). Progressive-edit streaming (POST a placeholder, then +# repeatedly PUT it) only works on these. Every other channel — Web Chat, +# Direct Line, the Emulator, etc. — returns ``405 Method Not Allowed`` on +# the PUT, so those channels buffer the stream and POST a single final +# message instead. Teams is the canonical (and effectively only) public +# channel that supports the edit operation. +_EDIT_CAPABLE_CHANNELS = frozenset({"msteams"}) + + +InboundAuthValidator = Callable[[Request], Awaitable[bool]] + + +def activity_protocol_isolation_key(conversation_id: Any) -> str: + """Build the namespaced isolation key the Teams channel writes under. + + Exposed at module scope so other channels' run hooks can opt into the + same per-conversation session (e.g. a Responses caller resuming a Teams + conversation by passing the conversation id). + """ + return f"activity:{conversation_id}" + + +class _OutboundError(RuntimeError): + """Marker for transient outbound failures that should produce 502/retry.""" + + +def _parse_activity(activity: Mapping[str, Any]) -> Message: + """Translate one Bot Framework ``message`` Activity into an Agent Framework Message. + + Pulls the activity's ``text`` plus any image/file attachments that expose a + resolvable ``contentUrl`` into ``Content`` parts. Bot Framework's inline + ``content`` field (e.g. the ``text/html`` rendering Teams attaches alongside + ``text``, or an Adaptive Card payload) is *not* a URI, so it is ignored here + to avoid mis-parsing it as a URL. If the activity has no usable parts an + empty text part is emitted so the caller never sees a content-less message. + """ + parts: list[Content] = [] + if (text := activity.get("text")) and isinstance(text, str): + parts.append(Content.from_text(text=text)) + + for attachment in activity.get("attachments") or []: + if not isinstance(attachment, Mapping): + continue + url = attachment.get("contentUrl") + content_type = attachment.get("contentType") + if not (isinstance(url, str) and isinstance(content_type, str) and "/" in content_type): + continue + # contentUrl is occasionally a relative reference or otherwise lacks a + # scheme; skip those so one odd attachment can't fail the whole turn. + if not urlparse(url).scheme: + logger.debug("Skipping attachment with non-absolute contentUrl: %r", url) + continue + try: + parts.append(Content.from_uri(uri=url, media_type=content_type)) + except ContentError: + logger.debug("Skipping attachment with unparseable contentUrl: %r", url) + continue + + if not parts: + parts.append(Content.from_text(text="")) + return Message("user", parts) + + +def _command_text(activity: Mapping[str, Any]) -> str: + """Return the activity text with the bot's own @mention stripped. + + Channels that require an @mention to address the bot (Teams team and + group-chat scopes) prefix the message ``text`` with a mention whose literal + rendering is carried in the matching ``entities[].text`` (e.g. + ``"Personal Assistant /todos"``). Personal 1:1 chats carry no + mention. We remove only the bot's own mention substring(s) — never other + users' mentions — so a leading ``/command`` can be detected in every scope. + """ + text = activity.get("text") + if not isinstance(text, str): + return "" + bot_id = recipient.get("id") if isinstance(recipient := activity.get("recipient"), Mapping) else None + for entity in activity.get("entities") or []: + if not isinstance(entity, Mapping) or entity.get("type") != "mention": + continue + mentioned = entity.get("mentioned") + mentioned_id = mentioned.get("id") if isinstance(mentioned, Mapping) else None + # Only strip the bot's own mention; leave mentions of other users intact. + # When the recipient id is unknown we cannot disambiguate, so fall back + # to stripping every mention to keep command detection working. + if bot_id is not None and mentioned_id != bot_id: + continue + mention_text = entity.get("text") + if isinstance(mention_text, str) and mention_text: + text = text.replace(mention_text, "") + return text.strip() + + +class ActivityProtocolChannel: + """Microsoft Teams channel via Bot Framework v4 webhook. + + Streaming + --------- + When ``stream=True`` (default), the channel sends an initial placeholder + activity, then edits it in place as the agent emits ``AgentResponseUpdate`` + chunks (``PUT /v3/conversations/{id}/activities/{id}``). When ``stream=False`` + it just sends the final reply. A ``stream_update_hook`` can rewrite or + drop individual updates before they hit the wire. + """ + + name = "activity" + + def __init__( + self, + *, + path: str = "/activity/messages", + app_id: str | None = None, + app_password: str | None = None, + certificate_path: str | None = None, + certificate_password: bytes | None = None, + tenant_id: str = _BOTFRAMEWORK_TENANT, + token_scope: str = _BOTFRAMEWORK_SCOPE, + credential: AsyncTokenCredential | None = None, + commands: Sequence[ChannelCommand] = (), + run_hook: ChannelRunHook | None = None, + response_hook: ChannelResponseHook | None = None, + send_typing_action: bool = True, + stream: bool = True, + stream_update_hook: ChannelStreamUpdateHook | None = None, + stream_edit_min_interval: float = 0.7, + inbound_auth_validator: InboundAuthValidator | None = None, + service_url_allowed_hosts: tuple[str, ...] = _DEFAULT_SERVICE_URL_HOSTS, + ) -> None: + """Configure the Activity Protocol channel. + + Streaming multimodal updates are automatically converted to Activity text via + the stream response text rendering chain (images, files, etc. → URIs, then + included in plain-text stream updates); the channel stream-update hook can + further customize. + + Keyword Args: + path: Messages endpoint path on the host. Use ``""`` to expose the + webhook at the app root. + app_id: Bot Framework / Entra application (client) id. Required + whenever any credential is supplied. + app_password: Application secret for OAuth2 client credentials. + Mutually exclusive with ``certificate_path``. + certificate_path: Path to a PEM file containing **both** the + private key and the X.509 certificate. Use this for tenants + that disallow client secrets. See the module docstring for an + ``openssl`` recipe. + certificate_password: Password for the PEM private key, if any. + tenant_id: Entra tenant. Defaults to ``"botframework.com"`` for + public Bot Framework channels; pass your tenant id for + single-tenant bots. + token_scope: OAuth2 scope to request. Defaults to the Bot + Framework resource. + credential: Bring your own ``AsyncTokenCredential`` (e.g. a + ``DefaultAzureCredential`` configured elsewhere). Overrides + ``app_password`` / ``certificate_path``. + commands: Discoverable ``/command`` handlers. An inbound message + whose text (after stripping the bot's own @mention) begins with + ``/`` and matches a command ``name`` (case-insensitive) is + dispatched to that handler instead of the agent, mirroring the + Telegram channel. The matching ``run_hook`` is applied to the + command request first, so command handlers observe the same + resolved ``session.isolation_key`` as ordinary messages. + Unknown ``/foo`` text falls through to the agent. Handlers reply + via ``ChannelCommandContext.reply``; surface them to users with + a Teams manifest ``commandLists`` entry. + run_hook: Optional rewrite of ``ChannelRequest`` before invocation; + the host owns invocation of this hook. Defaults to stripping + reserved request options so the host can manage agent invocation + context safely. + response_hook: Optional rewrite of the + :class:`HostedRunResult` before the originating Activity + reply is serialized; the host owns invocation of this hook. + send_typing_action: Whether to send ``typing`` activities while + the agent runs. + stream: Whether to stream by default. + stream_update_hook: Optional rewrite of each + ``AgentResponseUpdate`` before it hits the wire. + stream_edit_min_interval: Seconds between successive in-place + edits. Teams is more rate-sensitive than Telegram, so default + is higher. + inbound_auth_validator: Optional async callable invoked for each + inbound webhook request **before** the activity is parsed. + Return ``True`` to allow, ``False`` to reject with HTTP 401. + The webhook endpoint accepts unauthenticated requests by + default — Bot Framework normally validates inbound calls via + the JWT in the ``Authorization`` header (see Microsoft's + bot framework auth docs). The prototype intentionally does + NOT ship a built-in JWT validator (key rotation, OpenID + config caching, etc. are out of scope); plug your own + validator here, or terminate auth in front of the channel + (e.g. APIM, Application Gateway). When no credentials AND + no validator are configured the channel logs a loud + warning at startup so the dev-mode bypass cannot + accidentally ship. + service_url_allowed_hosts: Host (or host suffix) allow-list the + channel will POST a bearer token to. Defaults to the public + Bot Framework host suffixes (``botframework.com`` and + ``smba.trafficmanager.net``). An inbound activity claiming a + ``serviceUrl`` outside this set is rejected — without this + gate a malicious caller could redirect outbound replies (and + the attached bearer token) to an attacker-controlled host. + Pass an extended tuple for sovereign clouds or private + deployments; pass ``()`` to disable the check entirely + (only safe with strong inbound auth). + + Keyword Args: + path: Messages endpoint path on the host. Use ``""`` to expose the + webhook at the app root. + app_id: Bot Framework / Entra application (client) id. Required + whenever any credential is supplied. + app_password: Application secret for OAuth2 client credentials. + Mutually exclusive with ``certificate_path``. + certificate_path: Path to a PEM file containing **both** the + private key and the X.509 certificate. Use this for tenants + that disallow client secrets. See the module docstring for an + ``openssl`` recipe. + certificate_password: Password for the PEM private key, if any. + tenant_id: Entra tenant. Defaults to ``"botframework.com"`` for + public Bot Framework channels; pass your tenant id for + single-tenant bots. + token_scope: OAuth2 scope to request. Defaults to the Bot + Framework resource. + credential: Bring your own ``AsyncTokenCredential`` (e.g. a + ``DefaultAzureCredential`` configured elsewhere). Overrides + ``app_password`` / ``certificate_path``. + commands: Discoverable ``/command`` handlers. An inbound message + whose text (after stripping the bot's own @mention) begins with + ``/`` and matches a command ``name`` (case-insensitive) is + dispatched to that handler instead of the agent, mirroring the + Telegram channel. The matching ``run_hook`` is applied to the + command request first, so command handlers observe the same + resolved ``session.isolation_key`` as ordinary messages. + Unknown ``/foo`` text falls through to the agent. Handlers reply + via ``ChannelCommandContext.reply``; surface them to users with + a Teams manifest ``commandLists`` entry. + run_hook: Optional rewrite of ``ChannelRequest`` before invocation; + the host owns invocation of this hook. + response_hook: Optional rewrite of the + :class:`HostedRunResult` before the originating Activity + reply is serialized; the host owns invocation of this hook. + send_typing_action: Whether to send ``typing`` activities while + the agent runs. + stream: Whether to stream by default. + stream_update_hook: Optional rewrite of each + ``AgentResponseUpdate`` before it hits the wire. + stream_edit_min_interval: Seconds between successive in-place + edits. Teams is more rate-sensitive than Telegram, so default + is higher. + inbound_auth_validator: Optional async callable invoked for each + inbound webhook request **before** the activity is parsed. + Return ``True`` to allow, ``False`` to reject with HTTP 401. + The webhook endpoint accepts unauthenticated requests by + default — Bot Framework normally validates inbound calls via + the JWT in the ``Authorization`` header (see Microsoft's + bot framework auth docs). The prototype intentionally does + NOT ship a built-in JWT validator (key rotation, OpenID + config caching, etc. are out of scope); plug your own + validator here, or terminate auth in front of the channel + (e.g. APIM, Application Gateway). When no credentials AND + no validator are configured the channel logs a loud + warning at startup so the dev-mode bypass cannot + accidentally ship. + service_url_allowed_hosts: Host (or host suffix) allow-list the + channel will POST a bearer token to. Defaults to the public + Bot Framework host suffixes (``botframework.com`` and + ``smba.trafficmanager.net``). An inbound activity claiming a + ``serviceUrl`` outside this set is rejected — without this + gate a malicious caller could redirect outbound replies (and + the attached bearer token) to an attacker-controlled host. + Pass an extended tuple for sovereign clouds or private + deployments; pass ``()`` to disable the check entirely + (only safe with strong inbound auth). + """ + if app_password and certificate_path: + raise ValueError("ActivityProtocolChannel: pass either app_password or certificate_path, not both.") + self.path = path + self._app_id = app_id + self._token_scope = token_scope + self._tenant_id = tenant_id + self._commands = list(commands) + self._hook = run_hook + self.response_hook = response_hook + self._send_typing_action = send_typing_action + self._stream_default = stream + self._stream_update_hook = stream_update_hook + self._stream_edit_min_interval = stream_edit_min_interval + self._inbound_auth_validator = inbound_auth_validator + self._service_url_allowed_hosts = tuple(h.lower().lstrip(".") for h in service_url_allowed_hosts) + self._ctx: ChannelContext | None = None + self._http: httpx.AsyncClient | None = None + + # Build the credential up front so misconfiguration fails at construction. + self._credential: AsyncTokenCredential | None + if credential is not None: + self._credential = credential + elif app_id and certificate_path: + self._credential = CertificateCredential( + tenant_id=tenant_id, + client_id=app_id, + certificate_path=certificate_path, + password=certificate_password, + ) + elif app_id and app_password: + self._credential = ClientSecretCredential( + tenant_id=tenant_id, + client_id=app_id, + client_secret=app_password, + ) + else: + self._credential = None # dev mode + + def contribute(self, context: ChannelContext) -> ChannelContribution: + """Capture the host context and register the messages webhook.""" + self._ctx = context + return ChannelContribution( + routes=[Route("/", self._handle, methods=["POST"])], + commands=self._commands, + on_startup=[self._on_startup], + on_shutdown=[self._on_shutdown], + ) + + # -- lifecycle --------------------------------------------------------- # + + async def _on_startup(self) -> None: + """Open the outbound HTTP client and emit a startup banner. + + When no Bot Framework credential is configured we log a loud warning — + outbound replies will not authenticate, which is only acceptable + against the local Bot Framework Emulator. + + When no inbound auth validator is configured we also log a loud + warning so the dev-mode bypass cannot accidentally ship to + production: Bot Framework normally validates inbound requests via + a JWT in ``Authorization``; without that gate any caller that can + reach the webhook can drive the bot. + """ + if self._http is None: + self._http = httpx.AsyncClient(timeout=30.0) + if self._credential is None: + logger.warning( + "ActivityProtocolChannel running without credentials — outbound replies " + "will not authenticate. Use only with the Bot Framework " + "Emulator for local development." + ) + else: + cred_kind = type(self._credential).__name__ + logger.info( + "ActivityProtocolChannel listening on %s (auth=%s, tenant=%s)", + self.path, + cred_kind, + self._tenant_id, + ) + if self._inbound_auth_validator is None: + logger.warning( + "ActivityProtocolChannel %s has no inbound_auth_validator — " + "the webhook will accept ANY caller. Plug an inbound_auth_validator " + "or terminate auth in front of the channel before exposing this " + "endpoint to a public network.", + self.path, + ) + + async def _on_shutdown(self) -> None: + """Close the HTTP client and best-effort close the credential. + + Credential ``close`` failures are logged but never raised — shutdown + must never be allowed to mask the original cause of an app exit. + """ + if self._http is not None: + await self._http.aclose() + if self._credential is not None: + close = getattr(self._credential, "close", None) + if close is not None: + try: + await close() + except Exception: # pragma: no cover - best-effort + logger.exception("ActivityProtocolChannel credential close failed") + + # -- token management -------------------------------------------------- # + + async def _get_token(self) -> str | None: + """Acquire (and cache) an outbound bearer token. + + ``azure.identity`` credentials cache and refresh internally, so we + just delegate. + """ + if self._credential is None: + return None + try: + access_token = await self._credential.get_token(self._token_scope) + except Exception as exc: + # azure.identity raises ClientAuthenticationError / ServiceRequestError / + # HttpResponseError — none are httpx.HTTPError. Mark as transient so + # _handle returns 502 (retry) instead of silently dropping the activity. + raise _OutboundError("token acquisition failed") from exc + return access_token.token + + def _auth_headers(self, token: str | None) -> dict[str, str]: + """Return Bot Framework auth headers, or an empty dict in dev mode.""" + return {"Authorization": f"Bearer {token}"} if token else {} + + # -- request handling -------------------------------------------------- # + + def _is_service_url_allowed(self, service_url: str | None) -> bool: + """Return ``True`` if ``service_url`` host matches the allow-list.""" + if not self._service_url_allowed_hosts: + return True + if not service_url: + return False + try: + parsed = urlparse(service_url) + host = (parsed.hostname or "").lower() + except Exception: + return False + if not host: + return False + # When a credential is configured we POST a real bearer token to this + # serviceUrl, so reject plaintext http to avoid leaking it. In dev mode + # (no credential, e.g. the Bot Framework Emulator) http is allowed. + if self._credential is not None and parsed.scheme != "https": + logger.warning("Rejecting non-https serviceUrl while a credential is configured: %r", service_url) + return False + return any(host == allowed or host.endswith(f".{allowed}") for allowed in self._service_url_allowed_hosts) + + async def _handle(self, request: Request) -> Response: + """Bot Framework webhook entry point. + + Only ``message`` activities are processed; ``conversationUpdate``, + ``invoke``, ``typing`` and other activity types are silently + acknowledged. Auth-rejected requests return 401, malformed JSON + returns 400, and serviceUrl outside the allow-list returns 400. + + For *transient* outbound failures (network error / non-2xx from + Bot Service / token acquisition failure) we surface 502 so Bot + Service retries the inbound activity. Non-transient failures + (parsing errors, validation errors, deterministic agent crashes) + return 200 so Bot Service does not retry the same broken + activity in a loop. + """ + if self._inbound_auth_validator is not None: + try: + allowed = await self._inbound_auth_validator(request) + except Exception: + logger.exception("ActivityProtocolChannel inbound_auth_validator raised; rejecting request") + return JSONResponse({"error": "unauthorized"}, status_code=401) + if not allowed: + return JSONResponse({"error": "unauthorized"}, status_code=401) + + try: + activity = await request.json() + except Exception: + return JSONResponse({"error": "invalid json"}, status_code=400) + if not isinstance(activity, Mapping): + return JSONResponse({"error": "invalid json"}, status_code=400) + + # We accept only message activities for now. ``conversationUpdate``, + # ``invoke``, ``typing`` and friends are silently ack'd. + if activity.get("type") != "message": + return JSONResponse({}, status_code=202) + + service_url = activity.get("serviceUrl") + if not self._is_service_url_allowed(service_url if isinstance(service_url, str) else None): + logger.warning( + "ActivityProtocolChannel rejecting activity with serviceUrl=%r (not in allow-list)", + service_url, + ) + return JSONResponse({"error": "serviceUrl not allowed"}, status_code=400) + + try: + await self._process_activity(activity) + except (httpx.HTTPError, _OutboundError): + # Transient outbound failure (network error, non-2xx from Bot + # Service, token acquisition error). Surface 502 so Bot + # Service retries the inbound activity rather than dropping it. + logger.exception("ActivityProtocolChannel outbound transient failure — signalling Bot Service to retry") + return JSONResponse({"error": "upstream failure"}, status_code=502) + except Exception: + # Deterministic / agent-side failure: 200 so Bot Service does + # not retry the same broken activity in a loop. Operator picks + # the failure up via logs / telemetry. + logger.exception("ActivityProtocolChannel activity processing failed") + # Bot Framework expects 200 OK to dequeue the activity. + return JSONResponse({}, status_code=200) + + async def _process_activity(self, activity: Mapping[str, Any]) -> None: + """Build a :class:`ChannelRequest` from a message Activity and dispatch. + + The Teams isolation key is per-conversation so all members of a + group chat share session state. Activity metadata (``reply_to_id``, + ``recipient``) is preserved so reply-as-reaction style flows can + reconstruct the original message context. + """ + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + conversation = activity.get("conversation") + if not isinstance(conversation, Mapping): + conversation = {} + conversation_id = conversation.get("id") + service_url = activity.get("serviceUrl") + if not isinstance(conversation_id, str) or not isinstance(service_url, str): + logger.warning("Teams activity missing conversation.id or serviceUrl — dropping") + return + + # Native command dispatch — a leading ``/command`` (after stripping the + # bot's own @mention) bypasses the agent, mirroring the Telegram channel. + # Unknown commands fall through to the agent as a normal message. + if self._commands: + command_text = _command_text(activity) + if command_text.startswith("/"): + tokens = command_text[1:].split() + if tokens: + command_name = tokens[0].split("@", 1)[0].lower() + handler = next((c for c in self._commands if c.name.lower() == command_name), None) + if handler is not None: + await self._invoke_command(activity, conversation_id, service_url, handler, command_text) + return + + parsed = _parse_activity(activity) + # Store a Bot Framework conversation reference on the identity so + # channel hooks and command handlers can inspect it. Cross-channel + # proactive delivery is a follow-up enhancement outside the v1 host + # contract. + identity = ChannelIdentity( + channel=self.name, + native_id=conversation_id, + attributes={ + "service_url": service_url, + "conversation": dict(conversation), + # Inbound recipient is the bot → outbound ``from``; inbound + # ``from`` is the user → outbound ``recipient``. + "bot": dict(activity.get("recipient") or {}), + "user": dict(activity.get("from") or {}), + "channel_id": activity.get("channelId"), + "locale": activity.get("locale"), + }, + ) + channel_request = ChannelRequest( + channel=self.name, + operation="message.create", + input=[parsed], + session=ChannelSession(isolation_key=activity_protocol_isolation_key(conversation_id)), + identity=identity, + attributes={ + "conversation_id": conversation_id, + "service_url": service_url, + "from_id": (activity.get("from") or {}).get("id"), + "channel_id": activity.get("channelId"), + }, + metadata={"reply_to_id": activity.get("id"), "recipient": activity.get("recipient")}, + stream=self._stream_default, + ) + await self._dispatch(activity, channel_request) + + async def _invoke_command( + self, + activity: Mapping[str, Any], + conversation_id: str, + service_url: str, + handler: ChannelCommand, + command_text: str, + ) -> None: + """Run a matched ``/command`` handler and reply into the conversation. + + The command request mirrors the message-path request (same isolation + key, identity and attributes), but — like the Telegram channel — + commands bypass the agent and the channel ``run_hook`` entirely: the + handler runs directly. Handler/reply failures are logged but never + raised: commands are best-effort, and surfacing a 502 would make Bot + Service retry the inbound activity and re-run a non-idempotent command. + """ + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + identity = ChannelIdentity( + channel=self.name, + native_id=conversation_id, + attributes={ + "service_url": service_url, + "conversation": dict(activity.get("conversation") or {}), + "bot": dict(activity.get("recipient") or {}), + "user": dict(activity.get("from") or {}), + "channel_id": activity.get("channelId"), + "locale": activity.get("locale"), + }, + ) + request = ChannelRequest( + channel=self.name, + operation="command.invoke", + input=command_text, + session=ChannelSession(isolation_key=activity_protocol_isolation_key(conversation_id)), + identity=identity, + attributes={ + "conversation_id": conversation_id, + "service_url": service_url, + "from_id": (activity.get("from") or {}).get("id"), + "channel_id": activity.get("channelId"), + "aad_object_id": (activity.get("from") or {}).get("aadObjectId"), + }, + metadata={"reply_to_id": activity.get("id"), "recipient": activity.get("recipient")}, + ) + + async def _reply(body: str) -> None: + await self._send_message(activity, body) + + ctx = ChannelCommandContext(request=request, reply=_reply) + try: + await handler.handle(ctx) + except Exception: + logger.exception("ActivityProtocolChannel command %r failed", command_text) + + # -- outbound helpers -------------------------------------------------- # + + async def _dispatch(self, inbound: Mapping[str, Any], request: ChannelRequest) -> None: + """Run the target and ship the result back into the originating Teams conversation. + + Optionally fires a typing indicator before non-streaming runs; + streaming runs route through ``_stream_to_conversation`` which + progressively edits a single placeholder activity. + """ + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + if self._send_typing_action: + await self._send_typing(inbound) + + if not request.stream: + result = await self._ctx.run( + request, + run_hook=self._hook, + protocol_request=inbound, + response_hook=self.response_hook, + channel_name=self.name, + ) + text = getattr(result.result, "text", None) or "(no response)" + await self._send_message(inbound, text) + return + + stream = await self._ctx.run_stream( + request, + run_hook=self._hook, + protocol_request=inbound, + stream_update_hook=self._stream_update_hook, + response_hook=self.response_hook, + channel_name=self.name, + ) + await self._stream_to_conversation(inbound, request, stream) + + async def _stream_to_conversation( + self, + inbound: Mapping[str, Any], + request: ChannelRequest, + stream: ResponseStream[AgentResponseUpdate, AgentResponse], + ) -> None: + """Stream the reply back into the originating conversation. + + Channels that support the ``updateActivity`` REST operation (see + ``_EDIT_CAPABLE_CHANNELS`` — effectively only Teams) get the + progressive-edit experience: a ``…`` placeholder is POSTed, then + repeatedly PUT-edited as text accumulates. Every other channel — + Web Chat, Direct Line, the Emulator, etc. — returns ``405 Method + Not Allowed`` on the PUT, so those buffer the whole stream and POST + a single final message (``_buffer_and_send``); attempting the + edit path there would leave the user staring at a stray ``…``. + """ + if str(inbound.get("channelId") or "").lower() not in _EDIT_CAPABLE_CHANNELS: + await self._buffer_and_send(inbound, request, stream) + return + + accumulated = "" + last_sent = "" + last_edit_at = 0.0 + activity_id: str | None = None + placeholder_ok = False + edit_unsupported = False + worker_done = asyncio.Event() + wake = asyncio.Event() + + async def send_initial_placeholder() -> None: + nonlocal activity_id, last_edit_at, placeholder_ok + try: + activity_id = await self._send_message(inbound, "…") + last_edit_at = time.monotonic() + placeholder_ok = activity_id is not None + except Exception: + logger.exception( + "Activity placeholder send failed — falling back to single final POST", + ) + placeholder_ok = False + + async def edit_worker() -> None: + nonlocal last_sent, last_edit_at, edit_unsupported + # When the placeholder failed we have no activity_id to PUT + # into; the loop's only useful work is exiting cleanly. Skip + # straight to that — the final flush below will POST the + # accumulated text in one shot. + if not placeholder_ok: + return + while not (worker_done.is_set() and accumulated == last_sent): + await wake.wait() + wake.clear() + if accumulated == last_sent: + continue + elapsed = time.monotonic() - last_edit_at + if elapsed < self._stream_edit_min_interval: + try: + await asyncio.wait_for(wake.wait(), timeout=self._stream_edit_min_interval - elapsed) + wake.clear() + except asyncio.TimeoutError: + pass + snapshot = accumulated + if snapshot == last_sent: + continue + try: + await self._update_activity(inbound, activity_id or "", snapshot) + except httpx.HTTPStatusError as exc: + # Some channels advertised as edit-capable may still + # reject the PUT (405). Stop editing and let the final + # flush POST the accumulated text as a new message; + # don't advance ``last_sent`` so that flush still fires. + if exc.response.status_code == 405: + edit_unsupported = True + logger.warning( + "Activity edit not supported by channel %r — sending a single final message instead", + inbound.get("channelId"), + ) + return + logger.exception("Activity interim edit failed") + continue + except Exception: # pragma: no cover + logger.exception("Activity interim edit failed") + continue + last_sent = snapshot + last_edit_at = time.monotonic() + + await send_initial_placeholder() + edit_task = asyncio.create_task(edit_worker(), name="activity-edit-worker") + + try: + async for update in stream: + # Use multimodal stream contents: iterate and extract text from all text-type items. + # Non-text content (images, files, etc.) is ignored here and forwarded via the + # final response; this ensures text accumulation isn't corrupted by multimodal chunks. + for content in update.contents: + if content.type == "text" and content.text: + accumulated += content.text + wake.set() + except Exception: + logger.exception("Activity streaming consumption failed") + finally: + worker_done.set() + wake.set() + try: + await edit_task + except Exception: # pragma: no cover + logger.exception("Activity edit worker crashed") + + try: + final = await stream.get_final_response() + except Exception: # pragma: no cover + logger.exception("Stream finalize failed") + final = None + final_text = getattr(final, "text", None) or accumulated + + # Final flush — make sure the user sees everything that arrived after + # the worker's last edit. If the placeholder failed, or the channel + # turned out not to support edits (405), POST a fresh activity here + # with whatever accumulated rather than PUT-editing the placeholder. + if not placeholder_ok or edit_unsupported: + text = final_text or "(no response)" + try: + await self._send_message(inbound, text) + except (httpx.HTTPError, _OutboundError): + # Placeholder never landed (or edits unsupported), so this POST is + # the user's only delivery — surface 502 so Bot Service retries. + logger.exception("Activity fallback final send failed — signalling Bot Service to retry") + raise + except Exception: # pragma: no cover + logger.exception("Activity fallback final send failed") + elif activity_id is not None and final_text and final_text != last_sent: + try: + await self._update_activity(inbound, activity_id, final_text) + except Exception: # pragma: no cover + logger.exception("Activity final edit failed") + elif not final_text and activity_id is not None: + # No text streamed — replace the placeholder with a stub so the + # user isn't left staring at "…". + try: + await self._update_activity(inbound, activity_id, "(no response)") + except Exception: # pragma: no cover + logger.exception("Activity placeholder replace failed") + + async def _buffer_and_send( + self, + inbound: Mapping[str, Any], + request: ChannelRequest, + stream: ResponseStream[AgentResponseUpdate, AgentResponse], + ) -> None: + """Consume the whole stream and POST a single final message. + + Used for Bot Framework channels that do not support editing an + activity in place (everything except Teams — see + ``_EDIT_CAPABLE_CHANNELS``). Those channels return ``405`` to + ``PUT /v3/conversations/{id}/activities/{id}``, so the progressive + in-place edit cannot be used; we buffer the stream and ``POST`` a + single message at the end. Mirrors the non-streaming path's + response-hook semantics so behaviour is consistent regardless of + whether the target streamed. + """ + accumulated = "" + try: + async for update in stream: + # Use multimodal stream contents: iterate and extract text from all text-type items. + # Non-text content is ignored here (forwarded via final response); text accumulation + # is protected from corruption by multimodal chunks. + for content in update.contents: + if content.type == "text" and content.text: + accumulated += content.text + except Exception: + logger.exception("Activity streaming consumption failed") + + try: + final = await stream.get_final_response() + except Exception: # pragma: no cover + logger.exception("Stream finalize failed") + final = None + text = getattr(final, "text", None) or accumulated or "(no response)" + try: + await self._send_message(inbound, text) + except (httpx.HTTPError, _OutboundError): + # The single buffered POST is the user's only delivery. A transient + # failure must surface 502 so Bot Service retries (mirrors the + # non-streaming path's contract) rather than silently dropping it. + logger.exception("Activity buffered final send failed — signalling Bot Service to retry") + raise + except Exception: # pragma: no cover + logger.exception("Activity buffered final send failed") + + # -- Bot Framework REST helpers --------------------------------------- # + + def _activity_payload(self, inbound: Mapping[str, Any], text: str) -> dict[str, Any]: + """Build the outbound Activity envelope (text-only message).""" + recipient = inbound.get("from") or {} + from_user = inbound.get("recipient") or {} + return { + "type": "message", + "from": from_user, + "recipient": recipient, + "conversation": inbound.get("conversation") or {}, + "replyToId": inbound.get("id"), + "channelId": inbound.get("channelId"), + "serviceUrl": inbound.get("serviceUrl"), + "text": text, + "textFormat": "markdown", + } + + async def _send_message(self, inbound: Mapping[str, Any], text: str) -> str | None: + """POST a new Activity. Returns the assigned activity id.""" + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + service_url = str(inbound.get("serviceUrl") or "").rstrip("/") + conversation_id = (inbound.get("conversation") or {}).get("id") + if not service_url or not isinstance(conversation_id, str): + return None + url = f"{service_url}/v3/conversations/{conversation_id}/activities" + token = await self._get_token() + response = await self._http.post( + url, json=self._activity_payload(inbound, text), headers=self._auth_headers(token) + ) + response.raise_for_status() + payload = response.json() if response.content else {} + return payload.get("id") if isinstance(payload, dict) else None + + async def _update_activity(self, inbound: Mapping[str, Any], activity_id: str, text: str) -> None: + """PUT-edit an existing Activity (Teams updateActivity).""" + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + service_url = str(inbound.get("serviceUrl") or "").rstrip("/") + conversation_id = (inbound.get("conversation") or {}).get("id") + if not service_url or not isinstance(conversation_id, str): + return + url = f"{service_url}/v3/conversations/{conversation_id}/activities/{activity_id}" + token = await self._get_token() + response = await self._http.put( + url, json=self._activity_payload(inbound, text), headers=self._auth_headers(token) + ) + response.raise_for_status() + + async def _send_typing(self, inbound: Mapping[str, Any]) -> None: + """Send a Teams typing indicator; failures are logged and swallowed. + + The typing activity is purely a UX nicety — if it fails (token + expired, transient network issue, channel that doesn't support + typing) we never surface that to the user or block the actual + agent run. + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("activity channel not started") + service_url = str(inbound.get("serviceUrl") or "").rstrip("/") + conversation_id = (inbound.get("conversation") or {}).get("id") + if not service_url or not isinstance(conversation_id, str): + return + url = f"{service_url}/v3/conversations/{conversation_id}/activities" + token = await self._get_token() + try: + await self._http.post( + url, + json={ + "type": "typing", + "from": inbound.get("recipient") or {}, + "recipient": inbound.get("from") or {}, + "conversation": inbound.get("conversation") or {}, + "serviceUrl": inbound.get("serviceUrl"), + }, + headers=self._auth_headers(token), + ) + except Exception: # pragma: no cover - non-critical UX + logger.exception("Teams typing send failed") + + +__all__ = ["ActivityProtocolChannel", "activity_protocol_isolation_key"] diff --git a/python/packages/hosting-activity-protocol/pyproject.toml b/python/packages/hosting-activity-protocol/pyproject.toml new file mode 100644 index 0000000000..344373c938 --- /dev/null +++ b/python/packages/hosting-activity-protocol/pyproject.toml @@ -0,0 +1,89 @@ +[project] +name = "agent-framework-hosting-activity-protocol" +description = "Bot Framework Activity Protocol channel for agent-framework-hosting (Teams, Slack, etc. via Azure Bot Service)." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "agent-framework-hosting>=1.0.0a260424,<2", + "httpx>=0.27,<1", + "azure-identity>=1.20,<2", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting_activity_protocol"] +exclude = ['tests'] +# Bot Framework activities arrive as loosely-typed JSON-ish maps. Strict +# ``Unknown`` reporting on every ``.get(...)`` adds noise without catching +# real bugs — narrowing happens via runtime isinstance checks instead. +reportUnknownArgumentType = "none" +reportUnknownMemberType = "none" +reportUnknownVariableType = "none" +reportUnknownLambdaType = "none" +reportOptionalMemberAccess = "none" + +[tool.bandit] +targets = ["agent_framework_hosting_activity_protocol"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_activity_protocol --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/hosting-activity-protocol/tests/hosting_activity_protocol/test_channel.py b/python/packages/hosting-activity-protocol/tests/hosting_activity_protocol/test_channel.py new file mode 100644 index 0000000000..2e73d1a802 --- /dev/null +++ b/python/packages/hosting-activity-protocol/tests/hosting_activity_protocol/test_channel.py @@ -0,0 +1,883 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for :mod:`agent_framework_hosting_activity_protocol`. + +The Bot Framework outbound calls and azure-identity credentials are mocked +out so the suite never touches the network. Live token acquisition, +streaming edits and certificate paths are out of scope here. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, cast +from unittest.mock import AsyncMock, MagicMock + +import pytest +from agent_framework import Content +from agent_framework_hosting import ( + AgentFrameworkHost, + ChannelCommand, + ChannelCommandContext, + ChannelRequest, + HostedRunResult, +) +from starlette.testclient import TestClient + +from agent_framework_hosting_activity_protocol import ActivityProtocolChannel, activity_protocol_isolation_key +from agent_framework_hosting_activity_protocol._channel import _command_text, _parse_activity + + +def test_activity_protocol_isolation_key_format() -> None: + assert activity_protocol_isolation_key("19:meeting_xyz@thread.v2") == "activity:19:meeting_xyz@thread.v2" + assert activity_protocol_isolation_key(123) == "activity:123" + + +class TestParseActivity: + def test_text_only(self) -> None: + msg = _parse_activity({"type": "message", "text": "hello"}) + assert msg.role == "user" + assert msg.text == "hello" + + def test_with_attachment(self) -> None: + msg = _parse_activity({ + "type": "message", + "text": "see this", + "attachments": [ + {"contentType": "image/png", "contentUrl": "https://example.com/x.png"}, + ], + }) + assert msg.text == "see this" + assert any((getattr(c, "uri", None) or "").endswith("/x.png") for c in msg.contents) + + def test_skips_invalid_attachments(self) -> None: + msg = _parse_activity({ + "type": "message", + "text": "hi", + "attachments": [ + "not-a-mapping", + {"contentType": "image/png"}, # no url + {"contentUrl": "https://example.com/y", "contentType": "no-slash"}, + ], + }) + assert msg.text == "hi" + # No URI content survived. + assert not any(getattr(c, "uri", None) for c in msg.contents) + + def test_skips_teams_text_html_inline_content(self) -> None: + # Teams attaches a text/html rendering whose inline ``content`` is raw + # HTML (not a URL). It must not be parsed as a URI. + msg = _parse_activity({ + "type": "message", + "text": "hello there", + "attachments": [ + {"contentType": "text/html", "content": "

hello there

"}, + ], + }) + assert msg.text == "hello there" + assert not any(getattr(c, "uri", None) for c in msg.contents) + + def test_skips_attachment_contenturl_without_scheme(self) -> None: + msg = _parse_activity({ + "type": "message", + "text": "hi", + "attachments": [ + {"contentType": "image/png", "contentUrl": "/relative/path.png"}, + ], + }) + assert msg.text == "hi" + assert not any(getattr(c, "uri", None) for c in msg.contents) + + +class TestCommandText: + def test_plain_text_unchanged(self) -> None: + assert _command_text({"text": "/help"}) == "/help" + + def test_non_string_text_returns_empty(self) -> None: + assert _command_text({"text": None}) == "" + assert _command_text({}) == "" + + def test_strips_bot_mention(self) -> None: + activity = { + "text": "Personal Assistant /todos", + "recipient": {"id": "bot-1"}, + "entities": [ + {"type": "mention", "text": "Personal Assistant", "mentioned": {"id": "bot-1"}}, + ], + } + assert _command_text(activity) == "/todos" + + def test_strips_bot_mention_without_space(self) -> None: + activity = { + "text": "Bot/help", + "recipient": {"id": "bot-1"}, + "entities": [{"type": "mention", "text": "Bot", "mentioned": {"id": "bot-1"}}], + } + assert _command_text(activity) == "/help" + + def test_keeps_other_user_mention(self) -> None: + activity = { + "text": "/whoami Someone", + "recipient": {"id": "bot-1"}, + "entities": [{"type": "mention", "text": "Someone", "mentioned": {"id": "user-9"}}], + } + # Another user's mention must not be stripped. + assert _command_text(activity) == "/whoami Someone" + + def test_malformed_entities_are_ignored(self) -> None: + activity = { + "text": "/help", + "recipient": {"id": "bot-1"}, + "entities": ["not-a-mapping", {"type": "clientInfo"}, {"type": "mention"}], + } + assert _command_text(activity) == "/help" + + def test_non_mapping_recipient_is_ignored(self) -> None: + # A truthy-but-non-mapping ``recipient`` must not raise; command + # detection should still work (falls back to stripping all mentions). + assert _command_text({"text": "/help", "recipient": "bogus"}) == "/help" + + +@dataclass +class _FakeAgentResponse: + text: str + + +@dataclass +class _FakeUpdate: + text: str + + +class _FakeStream: + def __init__(self, chunks: list[str]) -> None: + self._chunks = chunks + + def __aiter__(self) -> Any: + async def gen() -> Any: + for chunk in self._chunks: + yield _FakeUpdate(chunk) + + return gen() + + async def get_final_response(self) -> _FakeAgentResponse: + return _FakeAgentResponse(text="".join(self._chunks)) + + +class _FakeAgent: + def __init__(self, reply: str = "ok") -> None: + self.id = "fake-agent" + self.name: str | None = "Fake Agent" + self.description: str | None = "Test fake agent" + self._reply = reply + self.runs: list[Any] = [] + + def create_session(self, *, session_id: str | None = None) -> Any: + return {"session_id": session_id} + + def get_session(self, service_session_id: str, *, session_id: str | None = None) -> Any: + return {"service_session_id": service_session_id, "session_id": session_id} + + def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: + self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs}) + if stream: + return _FakeStream([self._reply]) + + async def _coro() -> _FakeAgentResponse: + return _FakeAgentResponse(text=self._reply) + + return _coro() + + +def _make_teams( + stream: bool = False, *, path: str = "/activity/messages" +) -> tuple[ActivityProtocolChannel, _FakeAgent]: + agent = _FakeAgent("hi there") + ch = ActivityProtocolChannel(path=path, stream=stream, send_typing_action=False) + fake_http = MagicMock() + response_mock = MagicMock() + response_mock.raise_for_status = MagicMock() + response_mock.json = MagicMock(return_value={"id": "act-1"}) + fake_http.post = AsyncMock(return_value=response_mock) + fake_http.put = AsyncMock(return_value=response_mock) + fake_http.aclose = AsyncMock() + ch._http = fake_http + return ch, agent + + +_VALID_ACTIVITY: dict[str, Any] = { + "type": "message", + "id": "in-1", + "text": "hello bot", + "conversation": {"id": "19:meeting_xyz@thread.v2"}, + "from": {"id": "user-1"}, + "recipient": {"id": "bot-1"}, + "channelId": "msteams", + "serviceUrl": "https://smba.trafficmanager.net/amer/", +} + +# Minimal request envelope for direct ``_stream_to_conversation`` calls. +_VALID_REQUEST = ChannelRequest(channel="activity", operation="message.create", input=[]) + + +class TestTeamsWebhook: + def test_message_activity_dispatches_to_agent(self) -> None: + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 200 + assert agent.runs, "expected the agent to be invoked" + # And the channel posted a reply back to the conversation URL. + assert ch._http is not None + cast(Any, ch._http).post.assert_called() + url, _ = cast(Any, ch._http).post.call_args[0], cast(Any, ch._http).post.call_args[1] # noqa: F841 + assert "/v3/conversations/" in cast(Any, ch._http).post.call_args[0][0] + body = cast(Any, ch._http).post.call_args[1]["json"] + assert body["text"] == "hi there" + + def test_empty_path_mounts_at_app_root(self) -> None: + ch, agent = _make_teams(path="") + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/", json=_VALID_ACTIVITY) + assert r.status_code == 200 + assert agent.runs, "expected the agent to be invoked" + + def test_response_hook_can_rewrite_originating_reply(self) -> None: + seen_kwargs: list[dict[str, Any]] = [] + + def hook(result: HostedRunResult, **kwargs: Any) -> HostedRunResult: + seen_kwargs.append(dict(kwargs)) + return HostedRunResult(_FakeAgentResponse(text=result.result.text.upper()), session=result.session) + + ch, agent = _make_teams() + ch.response_hook = hook + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + + assert r.status_code == 200 + assert ch._http is not None + body = cast(Any, ch._http).post.call_args[1]["json"] + assert body["text"] == "HI THERE" + assert seen_kwargs + assert seen_kwargs[0]["channel_name"] == "activity" + + def test_non_message_activities_are_acked(self) -> None: + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post( + "/activity/messages", + json={"type": "conversationUpdate", "conversation": {"id": "x"}}, + ) + assert r.status_code == 202 + assert not agent.runs + + def test_invalid_json_returns_400(self) -> None: + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post( + "/activity/messages", + content=b"not-json", + headers={"content-type": "application/json"}, + ) + assert r.status_code == 400 + assert not agent.runs + + def test_message_missing_serviceurl_is_dropped(self) -> None: + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + bad = dict(_VALID_ACTIVITY) + bad.pop("serviceUrl") + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=bad) + # No serviceUrl → fails the allow-list check (None doesn't match + # any allowed host suffix), surfaced as 400 so a misconfigured + # caller knows the activity was structurally invalid. + assert r.status_code == 400 + assert not agent.runs + + def test_non_mapping_json_body_returns_400(self) -> None: + # A valid-JSON-but-non-object body (list/number/string) must not 500. + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=["not", "an", "object"]) + assert r.status_code == 400 + assert not agent.runs + + def test_non_mapping_conversation_is_dropped(self) -> None: + # A truthy-but-non-mapping ``conversation`` must not raise; the + # activity is dropped deterministically (200, no outbound POST). + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + bad = {**_VALID_ACTIVITY, "conversation": ["not", "a", "map"]} + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=bad) + assert r.status_code == 200 + assert ch._http is not None + cast(Any, ch._http).post.assert_not_called() + + +class TestCommands: + def _make_with_commands(self, commands: list[ChannelCommand]) -> tuple[ActivityProtocolChannel, _FakeAgent]: + agent = _FakeAgent("hi there") + ch = ActivityProtocolChannel(send_typing_action=False, commands=commands) + fake_http = MagicMock() + response_mock = MagicMock() + response_mock.raise_for_status = MagicMock() + response_mock.json = MagicMock(return_value={"id": "act-1"}) + fake_http.post = AsyncMock(return_value=response_mock) + fake_http.put = AsyncMock(return_value=response_mock) + fake_http.aclose = AsyncMock() + ch._http = fake_http + return ch, agent + + def test_slash_command_bypasses_agent_and_replies(self) -> None: + seen: list[ChannelCommandContext] = [] + + async def handle(ctx: ChannelCommandContext) -> None: + seen.append(ctx) + await ctx.reply("listed") + + ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) + host = AgentFrameworkHost(target=agent, channels=[ch]) + activity = dict(_VALID_ACTIVITY, text="/todos") + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=activity) + assert r.status_code == 200 + assert not agent.runs, "command must bypass the agent" + assert seen and seen[0].request.operation == "command.invoke" + assert seen[0].request.input == "/todos" + assert seen[0].request.session is not None + assert seen[0].request.session.isolation_key == activity_protocol_isolation_key("19:meeting_xyz@thread.v2") + assert ch._http is not None + assert cast(Any, ch._http).post.call_args[1]["json"]["text"] == "listed" + + def test_command_match_is_case_insensitive(self) -> None: + ran = False + + async def handle(ctx: ChannelCommandContext) -> None: + nonlocal ran + ran = True + + ch, agent = self._make_with_commands([ChannelCommand("New", "reset", handle)]) + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/new")) + assert r.status_code == 200 + assert ran + assert not agent.runs + + def test_unknown_command_falls_through_to_agent(self) -> None: + async def handle(ctx: ChannelCommandContext) -> None: # pragma: no cover - never called + raise AssertionError("should not run") + + ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/unknown")) + assert r.status_code == 200 + assert agent.runs, "unknown /command must reach the agent" + + def test_command_failure_does_not_retry(self) -> None: + async def handle(ctx: ChannelCommandContext) -> None: + raise RuntimeError("boom") + + ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos")) + # Best-effort: a failing command is swallowed and acked with 200 so Bot + # Service does not retry (and re-run a non-idempotent command). + assert r.status_code == 200 + assert not agent.runs + + def test_command_request_uses_activity_session(self) -> None: + captured: list[str | None] = [] + + async def handle(ctx: ChannelCommandContext) -> None: + assert ctx.request.session is not None + captured.append(ctx.request.session.isolation_key) + + agent = _FakeAgent("hi") + ch = ActivityProtocolChannel(send_typing_action=False, commands=[ChannelCommand("todos", "x", handle)]) + fake_http = MagicMock() + response_mock = MagicMock() + response_mock.raise_for_status = MagicMock() + response_mock.json = MagicMock(return_value={"id": "act-1"}) + fake_http.post = AsyncMock(return_value=response_mock) + fake_http.aclose = AsyncMock() + ch._http = fake_http + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos")) + assert r.status_code == 200 + assert captured == [activity_protocol_isolation_key("19:meeting_xyz@thread.v2")] + + +class TestOutbound: + async def test_send_message_posts_to_conversation_url(self) -> None: + ch, _agent = _make_teams() + await ch._send_message(_VALID_ACTIVITY, "hi") + assert ch._http is not None + cast(Any, ch._http).post.assert_called() + url = cast(Any, ch._http).post.call_args[0][0] + assert "/v3/conversations/" in url + body = cast(Any, ch._http).post.call_args[1]["json"] + assert body["text"] == "hi" + + +class TestIdentityRecording: + """``_process_activity`` must stamp the inbound conversation reference + onto ``ChannelRequest.identity`` so hooks and commands can inspect it.""" + + async def test_inbound_sets_request_identity(self) -> None: + ch, agent = _make_teams() + captured: dict[str, Any] = {} + + async def hook(req: ChannelRequest, **_: Any) -> ChannelRequest: + captured["request"] = req + return req + + ch._hook = hook # type: ignore[assignment] + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 200 + request = captured["request"] + assert request.identity is not None + assert request.identity.channel == "activity" + assert request.identity.native_id == "19:meeting_xyz@thread.v2" + attrs = request.identity.attributes + assert attrs["service_url"] == "https://smba.trafficmanager.net/amer/" + assert attrs["bot"] == {"id": "bot-1"} + assert attrs["user"] == {"id": "user-1"} + + +class TestConfig: + def test_rejects_both_secret_and_certificate(self) -> None: + with pytest.raises(ValueError, match="not both"): + ActivityProtocolChannel( + app_id="x", + app_password="s", + certificate_path="/tmp/does-not-exist.pem", + ) + + def test_dev_mode_no_credential(self) -> None: + ch = ActivityProtocolChannel() + assert ch._credential is None + + +class TestServiceUrlAllowList: + """``serviceUrl`` is supplied by the inbound activity and the channel + POSTs a real bearer token to it — anything outside the Bot Framework + host suffixes must be rejected so a malicious caller can't redirect + outbound replies to an attacker-controlled host.""" + + def test_default_allows_smba_trafficmanager(self) -> None: + ch = ActivityProtocolChannel() + assert ch._is_service_url_allowed("https://smba.trafficmanager.net/amer/") + assert ch._is_service_url_allowed("https://emea.smba.trafficmanager.net/") + assert ch._is_service_url_allowed("https://api.botframework.com/") + + def test_default_rejects_arbitrary_host(self) -> None: + ch = ActivityProtocolChannel() + assert not ch._is_service_url_allowed("https://attacker.example.com/") + assert not ch._is_service_url_allowed("https://botframework.com.attacker.com/") + assert not ch._is_service_url_allowed("") + assert not ch._is_service_url_allowed(None) + + def test_custom_allowlist(self) -> None: + ch = ActivityProtocolChannel(service_url_allowed_hosts=("internal.contoso.com",)) + assert ch._is_service_url_allowed("https://internal.contoso.com/v3/") + assert ch._is_service_url_allowed("https://eu.internal.contoso.com/") + assert not ch._is_service_url_allowed("https://smba.trafficmanager.net/") + + def test_empty_allowlist_disables_check(self) -> None: + ch = ActivityProtocolChannel(service_url_allowed_hosts=()) + assert ch._is_service_url_allowed("https://anywhere.example.org/") + + def test_rejects_non_https_when_credential_present(self) -> None: + # With a credential configured the channel POSTs a real bearer token to + # serviceUrl, so plaintext http (which would leak it) must be rejected + # even when the host is allow-listed. + ch = ActivityProtocolChannel() + ch._credential = MagicMock() + assert not ch._is_service_url_allowed("http://smba.trafficmanager.net/amer/") + assert ch._is_service_url_allowed("https://smba.trafficmanager.net/amer/") + + def test_allows_http_in_dev_mode(self) -> None: + # No credential (dev mode / Bot Framework Emulator) → http is allowed + # because no token is sent. + ch = ActivityProtocolChannel(service_url_allowed_hosts=("localhost",)) + assert ch._credential is None + assert ch._is_service_url_allowed("http://localhost:3978/") + + def test_webhook_rejects_disallowed_serviceurl(self) -> None: + ch, agent = _make_teams() + host = AgentFrameworkHost(target=agent, channels=[ch]) + bad = dict(_VALID_ACTIVITY) + bad["serviceUrl"] = "https://attacker.example.com/v3/" + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=bad) + assert r.status_code == 400 + assert not agent.runs + # No outbound POST attempted with a bearer token. + assert ch._http is not None + cast(Any, ch._http).post.assert_not_called() + + +class TestInboundAuthValidator: + def test_allow_passes_through(self) -> None: + async def allow(_req: Any) -> bool: + return True + + ch, agent = _make_teams() + ch._inbound_auth_validator = allow + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 200 + assert agent.runs + + def test_reject_returns_401(self) -> None: + async def deny(_req: Any) -> bool: + return False + + ch, agent = _make_teams() + ch._inbound_auth_validator = deny + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 401 + assert not agent.runs + + def test_validator_raises_returns_401(self) -> None: + async def boom(_req: Any) -> bool: + raise RuntimeError("validator broke") + + ch, agent = _make_teams() + ch._inbound_auth_validator = boom + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 401 + assert not agent.runs + + +class TestOutboundAuthHeader: + async def test_no_credential_sends_no_authorization_header(self) -> None: + ch, _agent = _make_teams() + # Default _make_teams has no credential — dev mode. + await ch._send_message(_VALID_ACTIVITY, "hi") + assert ch._http is not None + headers = cast(Any, ch._http).post.call_args[1]["headers"] + assert "Authorization" not in headers + + async def test_with_credential_sends_bearer_token(self) -> None: + ch, _agent = _make_teams() + # Inject a fake credential with a fixed token. + token_obj = MagicMock() + token_obj.token = "tok-abc123" + cred = MagicMock() + cred.get_token = AsyncMock(return_value=token_obj) + ch._credential = cred # type: ignore[assignment] + await ch._send_message(_VALID_ACTIVITY, "hi") + assert ch._http is not None + headers = cast(Any, ch._http).post.call_args[1]["headers"] + assert headers.get("Authorization") == "Bearer tok-abc123" + + +class TestRetrySignal: + """Distinguish transient outbound failures (network / 5xx) — which + must surface 502 so Bot Service retries — from deterministic agent + failures (which must return 200 to avoid retry loops).""" + + def test_outbound_http_error_returns_502(self) -> None: + import httpx as _httpx + + ch, agent = _make_teams() + # Make _send_message raise a transient httpx error. + assert ch._http is not None + cast(Any, ch._http).post = AsyncMock(side_effect=_httpx.ConnectError("nope")) + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 502 + + def test_deterministic_agent_failure_returns_200(self) -> None: + ch, agent = _make_teams() + + def boom(messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: + async def _coro() -> Any: + raise ValueError("agent crashed") + + return _coro() + + cast(Any, agent).run = boom + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + # Deterministic failure → 200 (Bot Service does not retry the same + # broken activity in a loop). + assert r.status_code == 200 + + def test_token_acquisition_failure_returns_502(self) -> None: + # azure.identity errors are not httpx errors; they must be wrapped as + # transient so the activity is retried rather than silently dropped. + ch, agent = _make_teams() + cred = MagicMock() + cred.get_token = AsyncMock(side_effect=RuntimeError("token boom")) + ch._credential = cred + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post("/activity/messages", json=_VALID_ACTIVITY) + assert r.status_code == 502 + + async def test_buffered_final_send_transient_failure_propagates(self) -> None: + # On a non-edit channel the single buffered POST is the only delivery; + # a transient failure must propagate so _handle surfaces 502 (retry). + import httpx as _httpx + + ch, _agent = _make_teams(stream=True) + assert ch._http is not None + cast(Any, ch._http).post = AsyncMock(side_effect=_httpx.ConnectError("nope")) + webchat_activity = {**_VALID_ACTIVITY, "channelId": "webchat"} + + @dataclass + class _Up: + text: str + + @property + def contents(self) -> list[Any]: + return [Content.from_text(self.text)] + + class _Stream: + def __aiter__(self) -> Any: + async def gen() -> Any: + yield _Up("hi") + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="hi") + + with pytest.raises(_httpx.HTTPError): + await ch._buffer_and_send(webchat_activity, _VALID_REQUEST, cast(Any, _Stream())) + + +class TestStreaming: + async def test_stream_sends_placeholder_and_edits(self) -> None: + ch, _agent = _make_teams(stream=True) + + # Build a fake stream that emits two text chunks then finalizes. + @dataclass + class _Up: + text: str + + @property + def contents(self) -> list[Any]: + return [Content.from_text(self.text)] + + class _Stream: + def __init__(self) -> None: + self._chunks = ["hel", "lo"] + + def __aiter__(self) -> Any: + async def gen() -> Any: + for c in self._chunks: + yield _Up(c) + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="hello") + + # Use a tight throttle so the test doesn't sit on `wait_for`. + ch._stream_edit_min_interval = 0.0 + await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, cast(Any, _Stream())) + assert ch._http is not None + # Placeholder POST + at least one final PUT. + cast(Any, ch._http).post.assert_called() + cast(Any, ch._http).put.assert_called() + # Final edit body carries the full accumulated text. + last_put_body = cast(Any, ch._http).put.call_args[1]["json"] + assert last_put_body["text"] == "hello" + + async def test_stream_placeholder_failure_falls_back_to_single_post(self) -> None: + # The bug: when send_initial_placeholder fails, activity_id stays + # None, the edit_worker can never reach its exit condition + # (`accumulated == last_sent` while no PUT possible) and the + # whole conversation deadlocks. After the fix we fall back to + # buffering the stream and POSTing a single final activity. + ch, _agent = _make_teams(stream=True) + # Make the FIRST POST (placeholder) raise; subsequent POST (final + # fallback) succeeds. + import httpx as _httpx + + ok_response = MagicMock() + ok_response.raise_for_status = MagicMock() + ok_response.json = MagicMock(return_value={"id": "act-final"}) + ok_response.content = b"{}" + post_mock = AsyncMock(side_effect=[_httpx.HTTPError("boom"), ok_response]) + assert ch._http is not None + cast(Any, ch._http).post = post_mock + + @dataclass + class _Up: + text: str + + @property + def contents(self) -> list[Any]: + return [Content.from_text(self.text)] + + class _Stream: + def __aiter__(self) -> Any: + async def gen() -> Any: + yield _Up("partial-1") + yield _Up("-partial-2") + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="partial-1-partial-2") + + ch._stream_edit_min_interval = 0.0 + # Should NOT hang. Use asyncio.wait_for with a small timeout to + # guard the test against future regressions of the deadlock. + import asyncio as _asyncio + + await _asyncio.wait_for( + ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, cast(Any, _Stream())), + timeout=2.0, + ) + # Two POSTs total: placeholder (failed) + fallback final. + assert post_mock.await_count == 2 + # Fallback POST contains the full accumulated text. + fallback_body = post_mock.call_args[1]["json"] + assert fallback_body["text"] == "partial-1-partial-2" + + async def test_stream_with_no_text_replaces_placeholder(self) -> None: + ch, _agent = _make_teams(stream=True) + + class _EmptyStream: + def __aiter__(self) -> Any: + async def gen() -> Any: + if False: + yield None # type: ignore[unreachable] + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="") + + ch._stream_edit_min_interval = 0.0 + await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, cast(Any, _EmptyStream())) + # The placeholder PUT-replaces with "(no response)" so the user + # isn't left staring at "…". + assert ch._http is not None + last_put_body = cast(Any, ch._http).put.call_args[1]["json"] + assert last_put_body["text"] == "(no response)" + + async def test_non_edit_channel_buffers_and_posts_single_message(self) -> None: + # Web Chat (and every non-Teams channel) does not support + # PUT /activities/{id}; the channel must buffer the stream and POST + # a single final message rather than the placeholder+edit dance. + ch, _agent = _make_teams(stream=True) + webchat_activity = {**_VALID_ACTIVITY, "channelId": "webchat"} + + @dataclass + class _Up: + text: str + + @property + def contents(self) -> list[Any]: + return [Content.from_text(self.text)] + + class _Stream: + def __aiter__(self) -> Any: + async def gen() -> Any: + yield _Up("hel") + yield _Up("lo") + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="hello") + + ch._stream_edit_min_interval = 0.0 + await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, cast(Any, _Stream())) + assert ch._http is not None + # No PUT (no editing); exactly one POST with the full text. + cast(Any, ch._http).put.assert_not_called() + assert cast(Any, ch._http).post.await_count == 1 + body = cast(Any, ch._http).post.call_args[1]["json"] + assert body["text"] == "hello" + + async def test_non_edit_channel_empty_stream_posts_no_response(self) -> None: + ch, _agent = _make_teams(stream=True) + webchat_activity = {**_VALID_ACTIVITY, "channelId": "directline"} + + class _EmptyStream: + def __aiter__(self) -> Any: + async def gen() -> Any: + if False: + yield None # type: ignore[unreachable] + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="") + + ch._stream_edit_min_interval = 0.0 + await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, cast(Any, _EmptyStream())) + assert ch._http is not None + cast(Any, ch._http).put.assert_not_called() + body = cast(Any, ch._http).post.call_args[1]["json"] + assert body["text"] == "(no response)" + + async def test_edit_405_falls_back_to_single_post(self) -> None: + # Defensive: a channel advertised as edit-capable that nonetheless + # rejects the PUT with 405 must stop editing and POST the final + # text as a fresh message instead of silently leaving "…". + import httpx as _httpx + + ch, _agent = _make_teams(stream=True) + assert ch._http is not None + + request_405 = _httpx.Request("PUT", "https://smba.trafficmanager.net/amer/v3/x") + response_405 = _httpx.Response(405, request=request_405) + cast(Any, ch._http).put = AsyncMock( + side_effect=_httpx.HTTPStatusError("405", request=request_405, response=response_405) + ) + + @dataclass + class _Up: + text: str + + @property + def contents(self) -> list[Any]: + return [Content.from_text(self.text)] + + class _Stream: + def __aiter__(self) -> Any: + async def gen() -> Any: + yield _Up("hel") + yield _Up("lo") + + return gen() + + async def get_final_response(self) -> Any: + return _FakeAgentResponse(text="hello") + + ch._stream_edit_min_interval = 0.0 + await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, cast(Any, _Stream())) + # Placeholder POST + fallback final POST = 2 POSTs; the final one + # carries the full text. + assert cast(Any, ch._http).post.await_count == 2 + final_body = cast(Any, ch._http).post.call_args[1]["json"] + assert final_body["text"] == "hello" diff --git a/python/pyproject.toml b/python/pyproject.toml index 242b4dbf1e..4ba0ad5044 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -92,6 +92,7 @@ agent-framework-foundry-local = { workspace = true } agent-framework-gemini = { workspace = true } agent-framework-github-copilot = { workspace = true } agent-framework-hosting = { workspace = true } +agent-framework-hosting-activity-protocol = { workspace = true } agent-framework-hosting-responses = { workspace = true } agent-framework-hosting-telegram = { workspace = true } agent-framework-hyperlight = { workspace = true } diff --git a/python/uv.lock b/python/uv.lock index 07415bdcec..cdf0347f54 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -48,6 +48,7 @@ members = [ "agent-framework-gemini", "agent-framework-github-copilot", "agent-framework-hosting", + "agent-framework-hosting-activity-protocol", "agent-framework-hosting-responses", "agent-framework-hosting-telegram", "agent-framework-hyperlight", @@ -665,6 +666,25 @@ provides-extras = ["serve", "disk"] [package.metadata.requires-dev] dev = [{ name = "httpx", specifier = ">=0.28.1" }] +[[package]] +name = "agent-framework-hosting-activity-protocol" +version = "1.0.0a260424" +source = { editable = "packages/hosting-activity-protocol" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "azure-identity", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "agent-framework-hosting", editable = "packages/hosting" }, + { name = "azure-identity", specifier = ">=1.20,<2" }, + { name = "httpx", specifier = ">=0.27,<1" }, +] + [[package]] name = "agent-framework-hosting-responses" version = "1.0.0a260424"