Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""add_task_creator_and_zedtoken

Revision ID: a1f73ada66c5
Revises: a9959ebcbe98
Create Date: 2026-05-21 15:08:51.441535

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'a1f73ada66c5'
down_revision: Union[str, None] = 'a9959ebcbe98'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.add_column('tasks', sa.Column('creator_user_id', sa.String(), nullable=True))
op.add_column('tasks', sa.Column('creator_service_account_id', sa.String(), nullable=True))
op.add_column('tasks', sa.Column('spark_authz_zedtoken', sa.Text(), nullable=True))
with op.get_context().autocommit_block():
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_user_id "
"ON tasks (creator_user_id)"
)
op.execute(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_tasks_creator_service_account_id "
"ON tasks (creator_service_account_id)"
)
op.create_check_constraint(
'ck_tasks_one_creator',
'tasks',
'(creator_user_id IS NULL) OR (creator_service_account_id IS NULL)',
)


def downgrade() -> None:
op.drop_constraint('ck_tasks_one_creator', 'tasks', type_='check')
with op.get_context().autocommit_block():
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_service_account_id")
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_tasks_creator_user_id")
op.drop_column('tasks', 'spark_authz_zedtoken')
op.drop_column('tasks', 'creator_service_account_id')
op.drop_column('tasks', 'creator_user_id')
4 changes: 3 additions & 1 deletion agentex/database/migrations/migration_history.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
9ff3ee32c81b -> e9c4ff9e6542 (head), add_tasks_metadata_gin_index
a9959ebcbe98 -> a1f73ada66c5 (head), add_task_creator_and_zedtoken
e9c4ff9e6542 -> a9959ebcbe98, finalize_spans_task_id
9ff3ee32c81b -> e9c4ff9e6542, add_tasks_metadata_gin_index
57c5ed4f59ae -> 9ff3ee32c81b, uppercase deployment status enum labels
4a9b7787ccd7 -> 57c5ed4f59ae, add_task_id_to_spans
d1a6cde41b3f -> 4a9b7787ccd7, deployments
Expand Down
3 changes: 3 additions & 0 deletions agentex/src/adapters/orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ class TaskORM(BaseORM):
)
params = Column(JSONB, nullable=True)
task_metadata = Column(JSONB, nullable=True)
creator_user_id = Column(String, nullable=True, index=True)
creator_service_account_id = Column(String, nullable=True, index=True)
spark_authz_zedtoken = Column(Text, nullable=True)
# Many-to-Many relationship with agents
agents = relationship("AgentORM", secondary="task_agents", back_populates="tasks")

Expand Down
12 changes: 12 additions & 0 deletions agentex/src/domain/entities/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ class TaskEntity(BaseModel):
None,
title="Task metadata",
)
creator_user_id: str | None = Field(
None,
title="Identity ID of the user who created this task (granted as FGAC owner)",
)
creator_service_account_id: str | None = Field(
None,
title="Service identity ID of the service account that created this task",
)
spark_authz_zedtoken: str | None = Field(
None,
title="ZedToken from the Spark AuthZ grant for new-write isolation",
)

# allow extra fields for agents relationships
model_config = ConfigDict(extra="allow")
Expand Down
96 changes: 94 additions & 2 deletions agentex/src/domain/services/task_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from fastapi import Depends

from src.adapters.streams.adapter_redis import DRedisStreamRepository
from src.api.schemas.authorization_types import AgentexResource
from src.domain.entities.agents import ACPType, AgentEntity
from src.domain.entities.events import EventEntity
from src.domain.entities.task_message_updates import TaskMessageUpdateEntity
Expand All @@ -14,6 +15,8 @@
from src.domain.repositories.task_repository import DTaskRepository
from src.domain.repositories.task_state_repository import DTaskStateRepository
from src.domain.services.agent_acp_service import DAgentACPService
from src.domain.services.authorization_service import DAuthorizationService
from src.utils.feature_flags import DFeatureFlagProvider, FeatureFlagName
from src.utils.ids import orm_id
from src.utils.logging import make_logger
from src.utils.stream_topics import get_task_event_stream_topic
Expand All @@ -33,19 +36,24 @@ def __init__(
task_repository: DTaskRepository,
event_repository: DEventRepository,
stream_repository: DRedisStreamRepository,
authorization_service: DAuthorizationService,
feature_flags: DFeatureFlagProvider,
):
self.acp_client = acp_client
self.task_state_repository = task_state_repository
self.task_repository = task_repository
self.event_repository = event_repository
self.stream_repository = stream_repository
self.authorization_service = authorization_service
self.feature_flags = feature_flags

async def create_task(
self,
agent: AgentEntity,
task_name: str | None = None,
task_params: dict[str, Any] | None = None,
task_metadata: dict[str, Any] | None = None,
account_id: str | None = None,
) -> TaskEntity:
"""
Create a new task record in the repository with single agent (maintains existing interface).
Expand All @@ -56,28 +64,107 @@ async def create_task(
task_params: The parameters for the task
task_metadata: Caller-provided metadata to persist on the task row.
Not forwarded to the agent.
account_id: Caller-resolved account scope. When provided and the
FGAC_TASKS_DUAL_WRITE flag is enabled for it, the task is also
registered in Spark AuthZ.
Returns:
Task containing the created task info
"""
principal_context = self.authorization_service.principal_context
creator_user_id = getattr(principal_context, "user_id", None)
creator_service_account_id = getattr(
principal_context, "service_account_id", None
)

task_id = orm_id()
zedtoken: str | None = None

if self.feature_flags.is_enabled(
FeatureFlagName.FGAC_TASKS_DUAL_WRITE, account_id
):
zedtoken = await self._register_task_in_spark_authz(
task_id=task_id,
account_id=account_id,
creator_user_id=creator_user_id,
creator_service_account_id=creator_service_account_id,
)

task_entity = await self.task_repository.create(
agent_id=agent.id,
task=TaskEntity(
id=orm_id(),
id=task_id,
name=task_name,
status=TaskStatus.RUNNING,
status_reason="Task created, forwarding to ACP server",
params=task_params,
task_metadata=task_metadata,
creator_user_id=creator_user_id,
creator_service_account_id=creator_service_account_id,
spark_authz_zedtoken=zedtoken,
),
)
return task_entity

async def _register_task_in_spark_authz(
self,
*,
task_id: str,
account_id: str | None,
creator_user_id: str | None,
creator_service_account_id: str | None,
) -> str | None:
"""Register a new task in Spark AuthZ with creator as owner.

Called BEFORE the Postgres write — a failure raises and prevents the
row from being persisted, so there is no compensating action to take.
Mirrors the KB FGAC pattern at
``packages/egp-api-backend/.../knowledge_base_v2_use_case.py:374-388``.

The current ``Provider.spark`` adapter returns ``{}`` from ``grant``;
no ZedToken is surfaced today, so we always return ``None`` for the
new-write-isolation column. A follow-up will plumb the token through
once the adapter exposes it.
"""
if creator_user_id is None and creator_service_account_id is None:
logger.warning(
"Skipping Spark AuthZ task registration: no creator resolvable",
extra={"task_id": task_id, "account_id": account_id},
)
return None
await self.authorization_service.grant(
resource=AgentexResource.task(task_id),
)
return None

async def deregister_task_from_spark_authz(
self, *, task_id: str, account_id: str | None
) -> None:
"""Best-effort revocation of a task's Spark AuthZ tuples on delete.

Only invoked when the FGAC_TASKS_DUAL_WRITE flag is enabled for the
caller's account. Failures are logged but do not block the delete.
"""
if not self.feature_flags.is_enabled(
FeatureFlagName.FGAC_TASKS_DUAL_WRITE, account_id
):
return
try:
await self.authorization_service.revoke(
resource=AgentexResource.task(task_id),
)
except Exception:
logger.warning(
"Spark AuthZ revoke failed for task",
extra={"task_id": task_id, "account_id": account_id},
exc_info=True,
)

async def create_task_and_forward_to_acp(
self,
agent: AgentEntity,
task_name: str | None = None,
task_params: dict[str, Any] | None = None,
account_id: str | None = None,
) -> TaskEntity:
"""
Create a new task record in the repository with single agent (maintains existing interface).
Expand All @@ -86,12 +173,17 @@ async def create_task_and_forward_to_acp(
Args:
agent: The agent to create the task for
task_params: The parameters for the task to be sent to the ACP server
account_id: Caller-resolved account scope; threaded through to
:meth:`create_task` for FGAC dual-write gating.

Returns:
Task containing the created task info
"""
task_entity = await self.create_task(
agent=agent, task_name=task_name, task_params=task_params
agent=agent,
task_name=task_name,
task_params=task_params,
account_id=account_id,
)

if agent.acp_type == ACPType.SYNC:
Expand Down
11 changes: 11 additions & 0 deletions agentex/src/domain/use_cases/agents_acp_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ async def _get_or_create_task(
task_name: str | None = None,
task_params: dict[str, Any] | None = None,
task_metadata: dict[str, Any] | None = None,
account_id: str | None = None,
) -> TaskEntity:
"""Return the existing task if *task_id* is provided, otherwise create a new one.

Expand Down Expand Up @@ -308,6 +309,7 @@ async def _get_or_create_task(
task_name=task_name,
task_params=task_params,
task_metadata=task_metadata,
account_id=account_id,
)
logger.info(f"[agent_id={agent.id}] Created task {task.id}")
await self.grant_with_retry(task)
Expand Down Expand Up @@ -419,6 +421,9 @@ async def _handle_task_create(
task_name=params.name,
task_params=params.params,
task_metadata=params.task_metadata,
account_id=getattr(
self.authorization_service.principal_context, "account_id", None
),
)

if agent.acp_type in [ACPType.AGENTIC, ACPType.ASYNC]:
Expand Down Expand Up @@ -457,6 +462,9 @@ async def _handle_message_send_sync(
task_id=params.task_id,
task_name=params.task_name,
task_params=params.task_params,
account_id=getattr(
self.authorization_service.principal_context, "account_id", None
),
)

# Step 1: Insert the message in the messages table
Expand Down Expand Up @@ -642,6 +650,9 @@ async def flush_aggregated_deltas(task_message_index: int) -> TaskMessageEntity:
task_id=params.task_id,
task_name=params.task_name,
task_params=params.task_params,
account_id=getattr(
self.authorization_service.principal_context, "account_id", None
),
)

# Append the input client message
Expand Down
8 changes: 8 additions & 0 deletions agentex/src/domain/use_cases/tasks_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ async def delete_task(self, id: str | None = None, name: str | None = None) -> N
task.status = TaskStatus.DELETED
task.status_reason = "Task deleted successfully"
await self.task_service.update_task(task=task)
account_id = getattr(
self.task_service.authorization_service.principal_context,
"account_id",
None,
)
await self.task_service.deregister_task_from_spark_authz(
task_id=task.id, account_id=account_id
)

async def list_tasks(
self,
Expand Down
29 changes: 29 additions & 0 deletions agentex/src/utils/feature_flags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
from enum import StrEnum
from typing import Annotated

from fastapi import Depends


class FeatureFlagName(StrEnum):
FGAC_TASKS = "fgac-tasks"
FGAC_TASKS_DUAL_WRITE = "fgac-tasks-dual-write"


class FeatureFlagProvider:
"""Per-account feature flag provider.

v1: env-var allowlist (per-account, comma-separated). The env var name is
derived from the flag name, e.g. ``FGAC_TASKS_DUAL_WRITE_ACCOUNTS``. A
follow-up will swap this for LaunchDarkly with an account_id context.
"""

def is_enabled(self, name: FeatureFlagName, account_id: str | None) -> bool:
if not account_id:
return False
env_key = f"{name.value.upper().replace('-', '_')}_ACCOUNTS"
allowed = os.environ.get(env_key, "")
return account_id in {a.strip() for a in allowed.split(",") if a.strip()}


DFeatureFlagProvider = Annotated[FeatureFlagProvider, Depends(FeatureFlagProvider)]
17 changes: 15 additions & 2 deletions agentex/tests/fixtures/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Provides factory functions and specific fixtures for creating services with test repositories.
"""

from unittest.mock import MagicMock, Mock
from unittest.mock import AsyncMock, MagicMock, Mock

import pytest

Expand Down Expand Up @@ -52,16 +52,29 @@ def create_task_service(
event_repository,
agent_acp_service,
redis_stream_repository,
authorization_service=None,
feature_flags=None,
):
"""Factory function to create AgentTaskService with given repositories and services"""
"""Factory function to create AgentTaskService with given repositories and services."""
from src.domain.services.task_service import AgentTaskService
from src.utils.feature_flags import FeatureFlagProvider

if authorization_service is None:
authorization_service = Mock()
authorization_service.principal_context = None
authorization_service.grant = AsyncMock(return_value=None)
authorization_service.revoke = AsyncMock(return_value=None)
if feature_flags is None:
feature_flags = FeatureFlagProvider()

return AgentTaskService(
task_repository=task_repository,
task_state_repository=task_state_repository,
event_repository=event_repository,
acp_client=agent_acp_service,
stream_repository=redis_stream_repository,
authorization_service=authorization_service,
feature_flags=feature_flags,
)


Expand Down
Loading
Loading