Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ Key variables to understand protocol behavior:
- `A2A_CLIENT_BEARER_TOKEN`: optional bearer token attached to outbound peer calls made by the embedded A2A client and `a2a_call` tool path.
- `A2A_CLIENT_BASIC_AUTH`: optional Basic auth credential attached to outbound peer calls made by the embedded A2A client and `a2a_call` tool path.
- `A2A_CLIENT_SUPPORTED_TRANSPORTS`: ordered outbound transport preference list.
- `A2A_TASK_STORE_BACKEND`: runtime state backend. Supported values: `database`, `memory`. Default: `database`.
- `A2A_TASK_STORE_DATABASE_URL`: database URL used by the default durable backend. Default: `sqlite+aiosqlite:///./opencode-a2a.db`.
- `A2A_TASK_STORE_BACKEND`: unified lightweight persistence backend for SDK task rows plus adapter-managed session / interrupt state. Supported values: `database`, `memory`. Default: `database`.
- `A2A_TASK_STORE_DATABASE_URL`: database URL used by the unified durable backend when `A2A_TASK_STORE_BACKEND=database`. Default: `sqlite+aiosqlite:///./opencode-a2a.db`.
- Runtime authentication is bearer-token only via `A2A_BEARER_TOKEN`.
- Runtime authentication also applies to `/health`; the public unauthenticated discovery surface remains `/.well-known/agent-card.json` and `/.well-known/agent.json`.
- The authenticated extended card endpoint `/agent/authenticatedExtendedCard` is bearer-token protected.
Expand Down Expand Up @@ -139,15 +139,22 @@ A2A_TASK_STORE_DATABASE_URL=sqlite+aiosqlite:///./opencode-a2a.db \
opencode-a2a
```

With the default `database` backend, the service persists:
With the default `database` backend, the unified lightweight persistence layer persists:

- task records
- session binding / ownership state
- pending preferred-session claims
- interrupt request bindings and tombstones

The runtime automatically applies lightweight schema migrations for its custom state tables and records the applied version in `a2a_schema_version`. This built-in path currently targets the local SQLite deployment profile and does not require Alembic.
This project is SQLite-first for local single-instance deployments. The runtime configures local durability-oriented SQLite connection settings (`WAL`, `busy_timeout`, `synchronous=NORMAL`) and creates missing parent directories for file-backed database paths.

The A2A SDK task table remains managed by the SDK's own `DatabaseTaskStore` initialization path. The internal migration runner only owns the additional `opencode-a2a` state tables listed above.
The runtime automatically applies lightweight schema migrations for its custom state tables and records the applied version in `a2a_schema_version`. Schema-version writes are idempotent across concurrent first-start races, pending preferred-session claims now persist absolute `expires_at` timestamps while remaining backward-compatible with legacy `updated_at` rows, and the built-in path currently targets the local SQLite deployment profile without requiring Alembic.

Database-backed task persistence also keeps the existing first-terminal-state-wins contract while tightening the SQLite path with an atomic terminal-write guard instead of relying only on process-local read-before-write checks. Any wider SQLAlchemy dialect compatibility should be treated as incidental implementation latitude rather than a documented deployment target.

At startup, the runtime logs a concise persistence summary covering the active backend, the redacted database URL when applicable, the shared persistence scope, and whether the SQLite local durability profile is active.

The A2A SDK task table remains managed by the SDK's own `DatabaseTaskStore` initialization path. The internal migration runner only owns the additional `opencode-a2a` state tables listed above, but both layers still share the same configured lightweight persistence backend.

In-flight asyncio locks, outbound A2A client caches, and stream-local aggregation buffers remain process-local runtime state.

Expand Down
4 changes: 4 additions & 0 deletions src/opencode_a2a/server/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
TaskStoreOperationError,
build_database_engine,
build_task_store,
describe_lightweight_persistence_backend,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -595,13 +596,15 @@ def create_app(settings: Settings) -> FastAPI:
)
public_card_etag = build_agent_card_etag(agent_card)
extended_card_etag = build_agent_card_etag(extended_agent_card)
persistence_summary = describe_lightweight_persistence_backend(settings)
lifespan = build_lifespan(
database_engine=database_engine,
task_store=task_store,
session_state_repository=session_state_repository,
interrupt_request_repository=interrupt_request_repository,
client_manager=client_manager,
upstream_client=upstream_client,
persistence_summary=persistence_summary,
)

app = A2AFastAPI(
Expand All @@ -615,6 +618,7 @@ def create_app(settings: Settings) -> FastAPI:
app.add_api_route(route[0], callback, methods=[route[1]])
app.state._jsonrpc_app = jsonrpc_app
app.state.task_store = task_store
app.state.persistence_summary = persistence_summary
app.state.agent_executor = executor
app.state.upstream_client = upstream_client
app.state.a2a_client_manager = client_manager
Expand Down
14 changes: 14 additions & 0 deletions src/opencode_a2a/server/lifespan.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

import logging
from collections.abc import Mapping
from contextlib import asynccontextmanager

from .state_store import initialize_state_repository
from .task_store import initialize_task_store

logger = logging.getLogger(__name__)


def build_lifespan(
*,
Expand All @@ -14,9 +18,19 @@ def build_lifespan(
interrupt_request_repository,
client_manager,
upstream_client,
persistence_summary: Mapping[str, object] | None = None,
):
@asynccontextmanager
async def lifespan(_app):
if persistence_summary is not None:
logger.info(
"Lightweight persistence configured backend=%s scope=%s "
"database_url=%s sqlite_tuning=%s",
persistence_summary.get("backend", "unknown"),
persistence_summary.get("scope", "unknown"),
persistence_summary.get("database_url", "n/a"),
persistence_summary.get("sqlite_tuning", "not_applicable"),
)
await initialize_task_store(task_store)
await initialize_state_repository(session_state_repository)
await initialize_state_repository(interrupt_request_repository)
Expand Down
162 changes: 134 additions & 28 deletions src/opencode_a2a/server/migrations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

from collections.abc import Callable
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING

from sqlalchemy import (
Column,
Index,
Integer,
MetaData,
String,
Expand All @@ -15,12 +16,13 @@
text,
update,
)
from sqlalchemy.exc import IntegrityError

if TYPE_CHECKING:
from sqlalchemy.engine import Connection

STATE_STORE_SCHEMA_NAME = "state_store"
CURRENT_STATE_STORE_SCHEMA_VERSION = 1
CURRENT_STATE_STORE_SCHEMA_VERSION = 3

_SCHEMA_VERSION_METADATA = MetaData()

Expand Down Expand Up @@ -65,54 +67,158 @@ def _migration_1_add_interrupt_details_json(
)


def _read_schema_version(connection: Connection, *, name: str) -> int:
def _migration_2_add_pending_claim_expires_at(
connection: Connection,
*,
pending_session_claims_table: Table,
) -> None:
_add_missing_nullable_column(
connection,
table=pending_session_claims_table,
column_name="expires_at",
)


def _create_missing_index(
connection: Connection,
*,
index: Index,
) -> None:
table = index.table
if table is None:
raise RuntimeError("State-store index is missing table metadata")
existing_indexes = {
existing_index["name"] for existing_index in inspect(connection).get_indexes(table.name)
}
if index.name in existing_indexes:
return
index.create(connection)


def _migration_3_add_lightweight_state_indexes(
connection: Connection,
*,
pending_session_claims_table: Table,
interrupt_requests_table: Table,
) -> None:
indexes = sorted(
[
*pending_session_claims_table.indexes,
*interrupt_requests_table.indexes,
],
key=lambda index: index.name or "",
)
for index in indexes:
_create_missing_index(connection, index=index)


def _read_schema_version(
connection: Connection,
*,
version_table: Table,
scope: str,
) -> int | None:
result = connection.execute(
select(_SCHEMA_VERSIONS.c.version).where(_SCHEMA_VERSIONS.c.name == name)
select(version_table.c.version).where(version_table.c.name == scope)
)
version = result.scalar_one_or_none()
return int(version) if version is not None else 0
return int(version) if version is not None else None


def _write_schema_version(connection: Connection, *, name: str, version: int) -> None:
exists = connection.execute(
select(_SCHEMA_VERSIONS.c.name).where(_SCHEMA_VERSIONS.c.name == name)
).scalar_one_or_none()
if exists is None:
connection.execute(insert(_SCHEMA_VERSIONS).values(name=name, version=version))
def _write_schema_version(
connection: Connection,
*,
version_table: Table,
scope: str,
version: int,
) -> None:
existing_version = _read_schema_version(
connection,
version_table=version_table,
scope=scope,
)
if existing_version is not None:
connection.execute(
update(version_table).where(version_table.c.name == scope).values(version=version)
)
return
connection.execute(
update(_SCHEMA_VERSIONS).where(_SCHEMA_VERSIONS.c.name == name).values(version=version)
try:
connection.execute(insert(version_table).values(name=scope, version=version))
except IntegrityError:
connection.execute(
update(version_table).where(version_table.c.name == scope).values(version=version)
)


def _apply_schema_migrations(
connection: Connection,
*,
version_table: Table,
scope: str,
current_version: int,
migrations: Mapping[int, Callable[[Connection], None]],
) -> int:
if current_version < 0:
raise ValueError("current_version must be non-negative")

stored_version = _read_schema_version(
connection,
version_table=version_table,
scope=scope,
)
if stored_version is not None and stored_version > current_version:
raise RuntimeError(
f"Database schema scope {scope!r} is newer than this application supports"
)

starting_version = stored_version or 0
for next_version in range(starting_version + 1, current_version + 1):
migration = migrations.get(next_version)
if migration is None:
raise RuntimeError(
f"Missing migration for schema scope {scope!r} version {next_version}"
)
migration(connection)
_write_schema_version(
connection,
version_table=version_table,
scope=scope,
version=next_version,
)

return current_version


def migrate_state_store_schema(
connection: Connection,
*,
state_metadata: MetaData,
pending_session_claims_table: Table,
interrupt_requests_table: Table,
current_version: int = CURRENT_STATE_STORE_SCHEMA_VERSION,
) -> int:
_SCHEMA_VERSION_METADATA.create_all(connection)
state_metadata.create_all(connection)

stored_version = _read_schema_version(connection, name=STATE_STORE_SCHEMA_NAME)
if stored_version > current_version:
raise RuntimeError(
"Database state-store schema version is newer than this application supports"
)

migrations: dict[int, Callable[[Connection], None]] = {
1: lambda conn: _migration_1_add_interrupt_details_json(
conn,
interrupt_requests_table=interrupt_requests_table,
),
2: lambda conn: _migration_2_add_pending_claim_expires_at(
conn,
pending_session_claims_table=pending_session_claims_table,
),
3: lambda conn: _migration_3_add_lightweight_state_indexes(
conn,
pending_session_claims_table=pending_session_claims_table,
interrupt_requests_table=interrupt_requests_table,
),
}

for next_version in range(stored_version + 1, current_version + 1):
migration = migrations.get(next_version)
if migration is None:
raise RuntimeError(f"Missing state-store migration for version {next_version}")
migration(connection)
_write_schema_version(connection, name=STATE_STORE_SCHEMA_NAME, version=next_version)

return current_version
return _apply_schema_migrations(
connection,
version_table=_SCHEMA_VERSIONS,
scope=STATE_STORE_SCHEMA_NAME,
current_version=current_version,
migrations=migrations,
)
Loading