Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/opencode_a2a/execution/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
UpstreamConcurrencyLimitError,
UpstreamContractError,
)
from ..output_modes import accepts_output_mode, normalize_accepted_output_modes
from ..parts.mapping import (
UnsupportedA2AInputError,
extract_text_from_a2a_parts,
Expand Down Expand Up @@ -90,6 +91,8 @@
)

logger = logging.getLogger(__name__)
_TEXT_PLAIN_MEDIA_TYPE = "text/plain"
_APPLICATION_JSON_MEDIA_TYPE = "application/json"

__all__ = [
"_build_output_metadata",
Expand Down Expand Up @@ -148,6 +151,7 @@ class _PreparedExecution:
directory: str | None
workspace_id: str | None
session_binding_context_id: str
allow_structured_output: bool


def _build_session_binding_context_id(
Expand Down Expand Up @@ -368,6 +372,7 @@ async def _bind_session(self) -> None:
directory=self._prepared.directory,
workspace_id=self._prepared.workspace_id,
terminal_signal=self._stream_terminal_signal,
allow_structured_output=self._prepared.allow_structured_output,
)
)

Expand Down Expand Up @@ -775,6 +780,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
identity = (call_context.state.get("identity") if call_context else None) or "anonymous"

streaming_request = self._should_stream(context)
accepted_output_modes = normalize_accepted_output_modes(context.configuration)
message_parts = (
getattr(context.message, "parts", None) if context.message is not None else None
)
Expand Down Expand Up @@ -853,6 +859,22 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
)
return

if not accepts_output_mode(accepted_output_modes, _TEXT_PLAIN_MEDIA_TYPE):
await self._emit_error(
event_queue,
task_id=task_id,
context_id=context_id,
message="acceptedOutputModes must include text/plain for OpenCode chat responses.",
state=TaskState.failed,
streaming_request=streaming_request,
)
return

allow_structured_output = accepts_output_mode(
accepted_output_modes,
_APPLICATION_JSON_MEDIA_TYPE,
)

logger.debug(
(
"Received message identity=%s task_id=%s context_id=%s "
Expand All @@ -877,6 +899,7 @@ async def execute(self, context: RequestContext, event_queue: EventQueue) -> Non
directory=directory,
workspace_id=workspace_id,
session_binding_context_id=session_binding_context_id,
allow_structured_output=allow_structured_output,
)
coordinator = _ExecutionCoordinator(
self,
Expand Down Expand Up @@ -1097,6 +1120,7 @@ async def _consume_opencode_stream(
terminal_signal: asyncio.Future[_StreamTerminalSignal],
directory: str | None = None,
workspace_id: str | None = None,
allow_structured_output: bool = True,
) -> None:
await self._stream_runtime.consume(
session_id=session_id,
Expand All @@ -1110,6 +1134,7 @@ async def _consume_opencode_stream(
terminal_signal=terminal_signal,
directory=directory,
workspace_id=workspace_id,
allow_structured_output=allow_structured_output,
)


Expand Down
3 changes: 3 additions & 0 deletions src/opencode_a2a/execution/stream_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async def consume(
terminal_signal: asyncio.Future[_StreamTerminalSignal],
directory: str | None = None,
workspace_id: str | None = None,
allow_structured_output: bool = True,
) -> None:
part_states: dict[str, _StreamPartState] = {}
pending_deltas: defaultdict[str, list[_PendingDelta]] = defaultdict(list)
Expand All @@ -85,6 +86,8 @@ async def consume(

async def _emit_chunks(chunks: list[_NormalizedStreamChunk]) -> None:
for chunk in chunks:
if not allow_structured_output and getattr(chunk.part.root, "kind", None) == "data":
continue
resolved_message_id = stream_state.resolve_message_id(chunk.message_id)
chunk_text = getattr(chunk.part.root, "text", "")
if stream_state.should_drop_initial_user_echo(
Expand Down
29 changes: 29 additions & 0 deletions src/opencode_a2a/output_modes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from __future__ import annotations

from collections.abc import Collection
from typing import Any


def normalize_accepted_output_modes(source: Any) -> tuple[str, ...] | None:
accepted = getattr(source, "accepted_output_modes", None) or getattr(
source, "acceptedOutputModes", None
)
if not isinstance(accepted, list):
return None

normalized: list[str] = []
for value in accepted:
if not isinstance(value, str):
continue
mode = value.strip().lower()
if not mode or mode in normalized:
continue
normalized.append(mode)
return tuple(normalized) or None


def accepts_output_mode(
accepted_output_modes: Collection[str] | None,
media_type: str,
) -> bool:
return accepted_output_modes is None or media_type in accepted_output_modes
32 changes: 30 additions & 2 deletions src/opencode_a2a/server/agent_card.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
from ..jsonrpc.application import SESSION_CONTEXT_PREFIX
from ..profile.runtime import RuntimeProfile, build_runtime_profile

_CHAT_INPUT_MODES = ["text/plain", "application/octet-stream"]
_CHAT_OUTPUT_MODES = ["text/plain", "application/json"]
_JSON_RPC_MODES = ["application/json"]


def _select_public_extension_params(
params: dict[str, Any],
Expand Down Expand Up @@ -381,6 +385,8 @@ def _build_agent_skills(
"Handle core A2A chat turns with shared session binding and optional "
"request-scoped model selection."
),
input_modes=list(_CHAT_INPUT_MODES),
output_modes=list(_CHAT_OUTPUT_MODES),
tags=["assistant", "coding", "opencode", "core-a2a", "portable"],
),
AgentSkill(
Expand All @@ -390,6 +396,8 @@ def _build_agent_skills(
"Inspect OpenCode session status, history, and low-risk lifecycle actions "
"through provider-private JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "sessions", "history", "provider-private"],
),
AgentSkill(
Expand All @@ -399,6 +407,8 @@ def _build_agent_skills(
"Discover available upstream providers and models through provider-private "
"JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "providers", "models", "provider-private"],
),
AgentSkill(
Expand All @@ -408,6 +418,8 @@ def _build_agent_skills(
"Manage OpenCode projects, workspaces, and worktrees through "
"provider-private JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "project", "workspace", "worktree", "provider-private"],
),
AgentSkill(
Expand All @@ -417,6 +429,8 @@ def _build_agent_skills(
"Recover pending permission and question interrupts through "
"provider-private JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["interrupt", "permission", "question", "provider-private"],
),
AgentSkill(
Expand All @@ -426,6 +440,8 @@ def _build_agent_skills(
"Reply to streaming permission and question interrupts through shared "
"JSON-RPC callbacks."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["interrupt", "permission", "question", "shared"],
),
]
Expand All @@ -439,6 +455,8 @@ def _build_agent_skills(
"TextPart and FilePart inputs to OpenCode sessions with shared session "
"binding and optional request-scoped model selection."
),
input_modes=list(_CHAT_INPUT_MODES),
output_modes=list(_CHAT_OUTPUT_MODES),
tags=["assistant", "coding", "opencode", "core-a2a", "portable"],
examples=_build_chat_examples(settings.a2a_project),
),
Expand All @@ -449,6 +467,8 @@ def _build_agent_skills(
"provider-private OpenCode session/history and session-control surface "
"exposed through JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "sessions", "history", "provider-private"],
examples=_build_session_query_skill_examples(
capability_snapshot=capability_snapshot,
Expand All @@ -461,6 +481,8 @@ def _build_agent_skills(
"provider-private OpenCode provider/model discovery surface exposed "
"through JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "providers", "models", "provider-private"],
examples=[
"List available providers (method opencode.providers.list).",
Expand All @@ -474,6 +496,8 @@ def _build_agent_skills(
"provider-private OpenCode project/workspace/worktree control surface "
"exposed through JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["opencode", "project", "workspace", "worktree", "provider-private"],
examples=_build_workspace_control_skill_examples(),
),
Expand All @@ -484,6 +508,8 @@ def _build_agent_skills(
"provider-private OpenCode interrupt recovery surface exposed through "
"JSON-RPC extensions."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["interrupt", "permission", "question", "provider-private"],
examples=_build_interrupt_recovery_skill_examples(),
),
Expand All @@ -495,6 +521,8 @@ def _build_agent_skills(
"JSON-RPC methods a2a.interrupt.permission.reply, "
"a2a.interrupt.question.reply, and a2a.interrupt.question.reject."
),
input_modes=list(_JSON_RPC_MODES),
output_modes=list(_JSON_RPC_MODES),
tags=["interrupt", "permission", "question", "shared"],
examples=[
"Reply once/always/reject to a permission request by request_id.",
Expand Down Expand Up @@ -535,8 +563,8 @@ def _build_agent_card(
version=settings.a2a_version,
protocol_version=settings.a2a_protocol_version,
preferred_transport=TransportProtocol.http_json,
default_input_modes=["text/plain", "application/octet-stream"],
default_output_modes=["text/plain"],
default_input_modes=list(_CHAT_INPUT_MODES),
default_output_modes=list(_CHAT_OUTPUT_MODES),
capabilities=AgentCapabilities(
streaming=True,
extensions=_build_agent_extensions(
Expand Down
Loading