Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
499 changes: 499 additions & 0 deletions src/opencode_a2a/execution/coordinator.py

Large diffs are not rendered by default.

624 changes: 21 additions & 603 deletions src/opencode_a2a/execution/executor.py

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions src/opencode_a2a/execution/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

import logging

logger = logging.getLogger("opencode_a2a.execution.executor")


def emit_metric(
name: str,
value: float = 1.0,
**labels: str | int | float | bool,
) -> None:
if labels:
labels_text = ",".join(
f"{key}={str(label).lower() if isinstance(label, bool) else label}"
for key, label in sorted(labels.items())
)
logger.debug("metric=%s value=%s labels=%s", name, value, labels_text)
return
logger.debug("metric=%s value=%s", name, value)
153 changes: 153 additions & 0 deletions src/opencode_a2a/execution/tool_orchestration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
from __future__ import annotations

import logging
import uuid
from typing import Any

from .tool_error_mapping import build_tool_error, map_a2a_tool_exception

logger = logging.getLogger(__name__)


async def maybe_handle_tools(
raw_response: dict[str, Any],
*,
a2a_client_manager,
) -> list[dict[str, Any]] | None:
parts = raw_response.get("parts", [])
if not isinstance(parts, list):
return None

results: list[dict[str, Any]] = []
for part in parts:
if not isinstance(part, dict) or part.get("type") != "tool":
continue

state = part.get("state")
if not isinstance(state, dict) or state.get("status") != "calling":
continue

tool_name = part.get("tool")
if tool_name == "a2a_call":
result = await handle_a2a_call_tool(part, a2a_client_manager=a2a_client_manager)
if result:
results.append(result)

return results if results else None


async def handle_a2a_call_tool(
part: dict[str, Any],
*,
a2a_client_manager,
) -> dict[str, Any]:
call_id = part.get("callID") or str(uuid.uuid4())
tool_name = part.get("tool") or "a2a_call"
state = part.get("state", {})
inputs = state.get("input", {})

if not isinstance(inputs, dict):
return {
"call_id": call_id,
"tool": tool_name,
**build_tool_error(
error_code="a2a_invalid_input",
error="Invalid a2a_call input payload",
),
}

agent_url = inputs.get("url")
message = inputs.get("message")
if not agent_url or not message:
return {
"call_id": call_id,
"tool": tool_name,
**build_tool_error(
error_code="a2a_missing_required_input",
error="Missing required a2a_call url or message",
),
}

if a2a_client_manager is None:
return {
"call_id": call_id,
"tool": tool_name,
**build_tool_error(
error_code="a2a_client_manager_unavailable",
error="A2A client manager is not available",
),
}

try:
event = None
result_text = ""
async with a2a_client_manager.borrow_client(agent_url) as client:
async for current_event in client.send_message(message):
event = current_event
extracted = client.extract_text(current_event)
if extracted:
result_text = merge_streamed_tool_output(result_text, extracted)

from a2a.types import Task

if result_text:
return {
"call_id": call_id,
"tool": tool_name,
"output": result_text,
}

if isinstance(event, Task):
result_text = ""
if event.status and event.status.message:
for part_obj in event.status.message.parts:
root = getattr(part_obj, "root", part_obj)
text_val = getattr(root, "text", "")
if text_val:
result_text += str(text_val)
return {
"call_id": call_id,
"tool": tool_name,
"output": result_text or "Task completed.",
}

if isinstance(event, tuple) and len(event) > 0 and isinstance(event[0], Task):
return {
"call_id": call_id,
"tool": tool_name,
"output": "Task completed (streaming).",
}

return {
"call_id": call_id,
"tool": tool_name,
**build_tool_error(
error_code="a2a_unexpected_response",
error="Remote A2A peer returned an unexpected response type",
error_meta={"response_type": type(event).__name__},
),
}
except Exception as exc:
logger.exception("A2A tool call failed")
return {
"call_id": call_id,
"tool": tool_name,
**map_a2a_tool_exception(exc),
}


def merge_streamed_tool_output(current: str, incoming: str) -> str:
if not current:
return incoming
if incoming == current or incoming in current:
return current
if incoming.startswith(current):
return incoming
if current.startswith(incoming):
return current
separator = (
""
if current.endswith(("\n", " ", "\t")) or incoming.startswith(("\n", " ", "\t"))
else "\n"
)
return f"{current}{separator}{incoming}"
Loading