Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ def _construct_constant_artifact_node_and_add_to_session(
# ============


@dataclasses.dataclass
class StatusHistoryEntry:
status: str
first_observed_at: datetime.datetime


@dataclasses.dataclass(kw_only=True)
class GetExecutionInfoResponse:
id: bts.IdType
Expand All @@ -512,6 +518,10 @@ class GetExecutionInfoResponse:
# ancestor_breadcrumbs: list[tuple[str, str]]
input_artifacts: dict[str, "ArtifactNodeIdResponse"] | None = None
output_artifacts: dict[str, "ArtifactNodeIdResponse"] | None = None
# Ordered history of container-execution status transitions for this node,
# sourced from `ExecutionNode.extra_data`. The last entry corresponds to the
# current status, so its `first_observed_at` is when the node entered it.
status_history: list[StatusHistoryEntry] | None = None


@dataclasses.dataclass
Expand Down Expand Up @@ -603,6 +613,19 @@ def get(self, session: orm.Session, id: bts.IdType) -> GetExecutionInfoResponse:
).where(bts.OutputArtifactLink.execution_id == id)
).tuples()
}
raw_status_history = (execution_node.extra_data or {}).get(
bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY, []
)
status_history = [
StatusHistoryEntry(
status=entry["status"],
first_observed_at=datetime.datetime.fromisoformat(
entry["first_observed_at"]
),
)
for entry in raw_status_history
if entry.get("status") and entry.get("first_observed_at")
] or None
return GetExecutionInfoResponse(
id=execution_node.id,
task_spec=structures.TaskSpec.from_json_dict(execution_node.task_spec),
Expand All @@ -611,6 +634,7 @@ def get(self, session: orm.Session, id: bts.IdType) -> GetExecutionInfoResponse:
child_task_execution_ids=child_task_execution_ids,
input_artifacts=input_artifacts,
output_artifacts=output_artifacts,
status_history=status_history,
)

def get_graph_execution_state(
Expand Down
85 changes: 85 additions & 0 deletions tests/test_execution_nodes_api_service.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import datetime

import pytest
from sqlalchemy import orm

from cloud_pipelines_backend import backend_types_sql as bts
from cloud_pipelines_backend import component_structures as structures
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.api_server_sql import (
ExecutionNodesApiService_Sql,
GetExecutionInfoResponse,
GetGraphExecutionStateResponse,
)


def _make_task_spec_dict(name: str = "task") -> dict:
return structures.TaskSpec(
component_ref=structures.ComponentReference(
spec=structures.ComponentSpec(
name=name,
implementation=structures.ContainerImplementation(
container=structures.ContainerSpec(image="test-image:latest"),
),
),
),
).to_json_dict()


def _initialize_db_and_get_session_factory():
db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://")
return lambda: orm.Session(bind=db_engine)
Expand Down Expand Up @@ -325,5 +342,73 @@ def test_three_level_mixed_stats(self):
assert result.child_execution_status_summary.has_ended is False


class TestGetExecutionInfo:
"""Tests for ExecutionNodesApiService_Sql.get (the /details endpoint)."""

def setup_method(self):
self.session_factory = _initialize_db_and_get_session_factory()
self.service = ExecutionNodesApiService_Sql()

def test_status_history_is_returned_in_order(self):
"""status_history mirrors ExecutionNode.extra_data, parsed and in order."""
history = [
{"status": "QUEUED", "first_observed_at": "2026-01-01T00:00:00+00:00"},
{"status": "PENDING", "first_observed_at": "2026-01-01T00:02:00+00:00"},
{"status": "RUNNING", "first_observed_at": "2026-01-01T00:05:00+00:00"},
]
with self.session_factory() as session:
node = _make_execution_node(task_spec=_make_task_spec_dict())
node.extra_data = {
bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history
}
session.add(node)
session.flush()

result = self.service.get(session, node.id)

assert isinstance(result, GetExecutionInfoResponse)
assert result.status_history is not None
assert [e.status for e in result.status_history] == [
"QUEUED",
"PENDING",
"RUNNING",
]
# The last entry is the current status; its timestamp is parsed to a datetime.
assert result.status_history[-1].first_observed_at == datetime.datetime(
2026, 1, 1, 0, 5, tzinfo=datetime.timezone.utc
)

def test_status_history_is_none_when_absent(self):
"""No history in extra_data yields status_history=None (not an empty list)."""
with self.session_factory() as session:
node = _make_execution_node(task_spec=_make_task_spec_dict())
session.add(node)
session.flush()

result = self.service.get(session, node.id)

assert result.status_history is None

def test_status_history_skips_malformed_entries(self):
"""Entries missing status or first_observed_at are dropped."""
history = [
{"status": "QUEUED", "first_observed_at": "2026-01-01T00:00:00+00:00"},
{"status": "PENDING"}, # missing first_observed_at
{"first_observed_at": "2026-01-01T00:03:00+00:00"}, # missing status
]
with self.session_factory() as session:
node = _make_execution_node(task_spec=_make_task_spec_dict())
node.extra_data = {
bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history
}
session.add(node)
session.flush()

result = self.service.get(session, node.id)

assert result.status_history is not None
assert [e.status for e in result.status_history] == ["QUEUED"]


if __name__ == "__main__":
pytest.main()
Loading