From acf2ae4aa13b77dde3fd69961ac44f92f0d58536 Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Thu, 25 Jun 2026 18:12:59 -0700 Subject: [PATCH] feat: Executions - API - Expose container status history on the details endpoint Surface ExecutionNode.extra_data status-change history (recorded by the status-transition instrumentation) as a new status_history field on GetExecutionInfoResponse (GET /api/executions/{id}/details). Each entry is {status, first_observed_at}; the list is ordered and the last entry corresponds to the node's current status, so its first_observed_at is when the node entered that status. This lets clients compute true time-in-status (e.g. how long a task has been PENDING/QUEUED) instead of relying on a browser-session heuristic. Malformed/partial entries are skipped; absence yields null rather than an empty list. --- cloud_pipelines_backend/api_server_sql.py | 24 +++++++ tests/test_execution_nodes_api_service.py | 85 +++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index 1e018c4e..b61eff43 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -502,6 +502,12 @@ def _construct_constant_artifact_node_and_add_to_session( # ============ +@dataclasses.dataclass +class StatusHistoryEntry: + status: str + first_observed_at: datetime.datetime + + @dataclasses.dataclass(kw_only=True) class GetExecutionInfoResponse: id: bts.IdType @@ -512,6 +518,10 @@ class GetExecutionInfoResponse: # ancestor_breadcrumbs: list[tuple[str, str]] input_artifacts: dict[str, "ArtifactNodeIdResponse"] | None = None output_artifacts: dict[str, "ArtifactNodeIdResponse"] | None = None + # Ordered history of container-execution status transitions for this node, + # sourced from `ExecutionNode.extra_data`. The last entry corresponds to the + # current status, so its `first_observed_at` is when the node entered it. + status_history: list[StatusHistoryEntry] | None = None @dataclasses.dataclass @@ -603,6 +613,19 @@ def get(self, session: orm.Session, id: bts.IdType) -> GetExecutionInfoResponse: ).where(bts.OutputArtifactLink.execution_id == id) ).tuples() } + raw_status_history = (execution_node.extra_data or {}).get( + bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY, [] + ) + status_history = [ + StatusHistoryEntry( + status=entry["status"], + first_observed_at=datetime.datetime.fromisoformat( + entry["first_observed_at"] + ), + ) + for entry in raw_status_history + if entry.get("status") and entry.get("first_observed_at") + ] or None return GetExecutionInfoResponse( id=execution_node.id, task_spec=structures.TaskSpec.from_json_dict(execution_node.task_spec), @@ -611,6 +634,7 @@ def get(self, session: orm.Session, id: bts.IdType) -> GetExecutionInfoResponse: child_task_execution_ids=child_task_execution_ids, input_artifacts=input_artifacts, output_artifacts=output_artifacts, + status_history=status_history, ) def get_graph_execution_state( diff --git a/tests/test_execution_nodes_api_service.py b/tests/test_execution_nodes_api_service.py index 90e6944d..50f4473a 100644 --- a/tests/test_execution_nodes_api_service.py +++ b/tests/test_execution_nodes_api_service.py @@ -1,14 +1,31 @@ +import datetime + import pytest from sqlalchemy import orm from cloud_pipelines_backend import backend_types_sql as bts +from cloud_pipelines_backend import component_structures as structures from cloud_pipelines_backend import database_ops from cloud_pipelines_backend.api_server_sql import ( ExecutionNodesApiService_Sql, + GetExecutionInfoResponse, GetGraphExecutionStateResponse, ) +def _make_task_spec_dict(name: str = "task") -> dict: + return structures.TaskSpec( + component_ref=structures.ComponentReference( + spec=structures.ComponentSpec( + name=name, + implementation=structures.ContainerImplementation( + container=structures.ContainerSpec(image="test-image:latest"), + ), + ), + ), + ).to_json_dict() + + def _initialize_db_and_get_session_factory(): db_engine = database_ops.create_db_engine_and_migrate_db(database_uri="sqlite://") return lambda: orm.Session(bind=db_engine) @@ -325,5 +342,73 @@ def test_three_level_mixed_stats(self): assert result.child_execution_status_summary.has_ended is False +class TestGetExecutionInfo: + """Tests for ExecutionNodesApiService_Sql.get (the /details endpoint).""" + + def setup_method(self): + self.session_factory = _initialize_db_and_get_session_factory() + self.service = ExecutionNodesApiService_Sql() + + def test_status_history_is_returned_in_order(self): + """status_history mirrors ExecutionNode.extra_data, parsed and in order.""" + history = [ + {"status": "QUEUED", "first_observed_at": "2026-01-01T00:00:00+00:00"}, + {"status": "PENDING", "first_observed_at": "2026-01-01T00:02:00+00:00"}, + {"status": "RUNNING", "first_observed_at": "2026-01-01T00:05:00+00:00"}, + ] + with self.session_factory() as session: + node = _make_execution_node(task_spec=_make_task_spec_dict()) + node.extra_data = { + bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history + } + session.add(node) + session.flush() + + result = self.service.get(session, node.id) + + assert isinstance(result, GetExecutionInfoResponse) + assert result.status_history is not None + assert [e.status for e in result.status_history] == [ + "QUEUED", + "PENDING", + "RUNNING", + ] + # The last entry is the current status; its timestamp is parsed to a datetime. + assert result.status_history[-1].first_observed_at == datetime.datetime( + 2026, 1, 1, 0, 5, tzinfo=datetime.timezone.utc + ) + + def test_status_history_is_none_when_absent(self): + """No history in extra_data yields status_history=None (not an empty list).""" + with self.session_factory() as session: + node = _make_execution_node(task_spec=_make_task_spec_dict()) + session.add(node) + session.flush() + + result = self.service.get(session, node.id) + + assert result.status_history is None + + def test_status_history_skips_malformed_entries(self): + """Entries missing status or first_observed_at are dropped.""" + history = [ + {"status": "QUEUED", "first_observed_at": "2026-01-01T00:00:00+00:00"}, + {"status": "PENDING"}, # missing first_observed_at + {"first_observed_at": "2026-01-01T00:03:00+00:00"}, # missing status + ] + with self.session_factory() as session: + node = _make_execution_node(task_spec=_make_task_spec_dict()) + node.extra_data = { + bts.EXECUTION_NODE_EXTRA_DATA_STATUS_HISTORY_KEY: history + } + session.add(node) + session.flush() + + result = self.service.get(session, node.id) + + assert result.status_history is not None + assert [e.status for e in result.status_history] == ["QUEUED"] + + if __name__ == "__main__": pytest.main()