From 2c9b70e7bfbe84aed4cbcf12ddc4878bf8e1529d Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 12:10:59 +0100 Subject: [PATCH 01/10] Move runtimes to json journal store --- CHANGELOG.md | 4 +- pyproject.toml | 1 + src/_pytask/journal.py | 41 ++++++++ src/_pytask/profile.py | 68 ++++++------- src/_pytask/runtime_store.py | 179 +++++++++++++++++++++++++++++++++++ src/pytask/__init__.py | 2 - tests/test_profile.py | 16 +--- tests/test_runtime_store.py | 38 ++++++++ uv.lock | 58 ++++++++++++ 9 files changed, 356 insertions(+), 51 deletions(-) create mode 100644 src/_pytask/journal.py create mode 100644 src/_pytask/runtime_store.py create mode 100644 tests/test_runtime_store.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 080bef0b..1a8e8749 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and ## Unreleased -- Nothing yet. +- Move runtime profiling persistence from SQLite to a JSON snapshot plus append-only + journal in `.pytask/`, keeping runtime data resilient to crashes and compacted on + normal build exits. ## 0.5.8 - 2025-12-30 diff --git a/pyproject.toml b/pyproject.toml index d3855f5d..82dde20d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "attrs>=21.3.0", "click>=8.1.8,!=8.2.0", "click-default-group>=1.2.4", + "msgspec>=0.18.6", "networkx>=2.4.0", "optree>=0.9.0", "packaging>=23.0.0", diff --git a/src/_pytask/journal.py b/src/_pytask/journal.py new file mode 100644 index 00000000..bc48066d --- /dev/null +++ b/src/_pytask/journal.py @@ -0,0 +1,41 @@ +"""Helpers for append-only JSONL journals.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING +from typing import TypeVar + +import msgspec + +if TYPE_CHECKING: + from pathlib import Path + +T = TypeVar("T") + + +def append_jsonl(path: Path, payload: msgspec.Struct) -> None: + """Append a JSON line to the journal.""" + with path.open("ab") as journal_file: + journal_file.write(msgspec.json.encode(payload) + b"\n") + + +def read_jsonl(path: Path, *, type_: type[T]) -> list[T]: + """Read JSONL entries from a journal, stopping at the first invalid line.""" + if not path.exists(): + return [] + + entries: list[T] = [] + for line in path.read_bytes().splitlines(): + if not line.strip(): + continue + try: + entries.append(msgspec.json.decode(line, type=type_)) + except msgspec.DecodeError: + break + return entries + + +def delete_if_exists(path: Path) -> None: + """Delete a file if it exists.""" + if path.exists(): + path.unlink() diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index 95dc82e2..dffd3edc 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -13,16 +13,12 @@ import click from rich.table import Table -from sqlalchemy.orm import Mapped -from sqlalchemy.orm import mapped_column from _pytask.click import ColoredCommand from _pytask.click import EnumChoice from _pytask.console import console from _pytask.console import format_task_name from _pytask.dag import create_dag -from _pytask.database_utils import BaseTable -from _pytask.database_utils import DatabaseSession from _pytask.exceptions import CollectionError from _pytask.exceptions import ConfigurationError from _pytask.node_protocols import PPathNode @@ -31,6 +27,7 @@ from _pytask.outcomes import TaskOutcome from _pytask.pluginmanager import hookimpl from _pytask.pluginmanager import storage +from _pytask.runtime_store import RuntimeState from _pytask.session import Session from _pytask.traceback import Traceback @@ -48,16 +45,6 @@ class _ExportFormats(enum.Enum): CSV = "csv" -class Runtime(BaseTable): - """Record of runtimes of tasks.""" - - __tablename__ = "runtime" - - task: Mapped[str] = mapped_column(primary_key=True) - date: Mapped[float] - duration: Mapped[float] - - @hookimpl(tryfirst=True) def pytask_extend_command_line_interface(cli: click.Group) -> None: """Extend the command line interface.""" @@ -67,6 +54,7 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None: @hookimpl def pytask_post_parse(config: dict[str, Any]) -> None: """Register the export option.""" + config["runtime_state"] = RuntimeState.from_root(config["root"]) config["pm"].register(ExportNameSpace) config["pm"].register(DurationNameSpace) config["pm"].register(FileSizeNameSpace) @@ -83,26 +71,16 @@ def pytask_execute_task(task: PTask) -> Generator[None, None, None]: @hookimpl -def pytask_execute_task_process_report(report: ExecutionReport) -> None: - """Store runtime of successfully finishing tasks in database.""" +def pytask_execute_task_process_report( + session: Session, report: ExecutionReport +) -> None: + """Store runtime of successfully finishing tasks.""" task = report.task duration = task.attributes.get("duration") if report.outcome == TaskOutcome.SUCCESS and duration is not None: - _create_or_update_runtime(task.signature, *duration) - - -def _create_or_update_runtime(task_signature: str, start: float, end: float) -> None: - """Create or update a runtime entry.""" - with DatabaseSession() as session: - runtime = session.get(Runtime, task_signature) - - if not runtime: - session.add(Runtime(task=task_signature, date=start, duration=end - start)) - else: - for attr, val in (("date", start), ("duration", end - start)): - setattr(runtime, attr, val) - - session.commit() + runtime_state = session.config.get("runtime_state") + if runtime_state is not None: + runtime_state.update_task(task, *duration) @click.command(cls=ColoredCommand) @@ -189,20 +167,23 @@ class DurationNameSpace: @staticmethod @hookimpl def pytask_profile_add_info_on_task( - tasks: list[PTask], profile: dict[str, dict[str, Any]] + session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]] ) -> None: """Add the runtime for tasks to the profile.""" - runtimes = _collect_runtimes(tasks) + runtimes = _collect_runtimes(session, tasks) for name, duration in runtimes.items(): profile[name]["Duration (in s)"] = round(duration, 2) -def _collect_runtimes(tasks: list[PTask]) -> dict[str, float]: +def _collect_runtimes(session: Session, tasks: list[PTask]) -> dict[str, float]: """Collect runtimes.""" - with DatabaseSession() as session: - runtimes = [session.get(Runtime, task.signature) for task in tasks] + runtime_state = session.config.get("runtime_state") + if runtime_state is None: + return {} return { - task.name: r.duration for task, r in zip(tasks, runtimes, strict=False) if r + task.name: duration + for task in tasks + if (duration := runtime_state.get_duration(task)) is not None } @@ -313,3 +294,16 @@ def _get_info_names(profile: dict[str, dict[str, Any]]) -> list[str]: base: set[str] = set() info_names: list[str] = sorted(base.union(*(set(val) for val in profile.values()))) return info_names + + +@hookimpl +def pytask_unconfigure(session: Session) -> None: + """Flush runtime information on normal build exits.""" + if session.config.get("command") != "build": + return + if session.config.get("dry_run") or session.config.get("explain"): + return + runtime_state = session.config.get("runtime_state") + if runtime_state is None: + return + runtime_state.flush() diff --git a/src/_pytask/runtime_store.py b/src/_pytask/runtime_store.py new file mode 100644 index 00000000..e4f9b7ed --- /dev/null +++ b/src/_pytask/runtime_store.py @@ -0,0 +1,179 @@ +"""Runtime storage with an append-only journal.""" + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field +from typing import TYPE_CHECKING + +import msgspec +from packaging.version import Version + +from _pytask.journal import append_jsonl +from _pytask.journal import delete_if_exists +from _pytask.journal import read_jsonl + +if TYPE_CHECKING: + from pathlib import Path + + from _pytask.node_protocols import PTask + +CURRENT_RUNTIME_VERSION = "1" + + +class RuntimeStoreError(Exception): + """Raised when reading or writing runtime files fails.""" + + +class RuntimeStoreVersionError(RuntimeStoreError): + """Raised when a runtime file version is not supported.""" + + +class _RuntimeEntry(msgspec.Struct): + id: str + date: float + duration: float + + +class _RuntimeFile(msgspec.Struct, forbid_unknown_fields=False): + runtime_version: str = msgspec.field(name="runtime-version") + task: list[_RuntimeEntry] = msgspec.field(default_factory=list) + + +class _RuntimeJournalEntry(msgspec.Struct): + runtime_version: str = msgspec.field(name="runtime-version") + id: str + date: float + duration: float + + +def _runtimes_path(root: Path) -> Path: + return root / ".pytask" / "runtimes.json" + + +def _journal_path(path: Path) -> Path: + return path.with_suffix(".journal") + + +def _read_runtimes(path: Path) -> _RuntimeFile | None: + if not path.exists(): + return None + try: + data = msgspec.json.decode(path.read_bytes(), type=_RuntimeFile) + except msgspec.DecodeError: + msg = "Runtime file has invalid format." + raise RuntimeStoreError(msg) from None + + if Version(data.runtime_version) != Version(CURRENT_RUNTIME_VERSION): + msg = ( + f"Unsupported runtime-version {data.runtime_version!r}. " + f"Current version is {CURRENT_RUNTIME_VERSION}." + ) + raise RuntimeStoreVersionError(msg) + return data + + +def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None: + data = msgspec.json.encode(runtimes) + tmp = path.with_suffix(f"{path.suffix}.tmp") + tmp.write_bytes(data) + tmp.replace(path) + + +def _read_journal(path: Path) -> list[_RuntimeJournalEntry]: + journal_path = _journal_path(path) + entries = read_jsonl(journal_path, type_=_RuntimeJournalEntry) + for entry in entries: + if Version(entry.runtime_version) != Version(CURRENT_RUNTIME_VERSION): + msg = ( + f"Unsupported runtime-version {entry.runtime_version!r}. " + f"Current version is {CURRENT_RUNTIME_VERSION}." + ) + raise RuntimeStoreVersionError(msg) + return entries + + +def _apply_journal( + runtimes: _RuntimeFile, entries: list[_RuntimeJournalEntry] +) -> _RuntimeFile: + if not entries: + return runtimes + index = {entry.id: entry for entry in runtimes.task} + for entry in entries: + index[entry.id] = _RuntimeEntry( + id=entry.id, date=entry.date, duration=entry.duration + ) + return _RuntimeFile( + runtime_version=CURRENT_RUNTIME_VERSION, + task=list(index.values()), + ) + + +def _build_task_id(task: PTask) -> str: + return task.name + + +@dataclass +class RuntimeState: + path: Path + runtimes: _RuntimeFile + _index: dict[str, _RuntimeEntry] = field(init=False, default_factory=dict) + _dirty: bool = field(init=False, default=False) + + def __post_init__(self) -> None: + self._rebuild_index() + + @classmethod + def from_root(cls, root: Path) -> RuntimeState: + path = _runtimes_path(root) + existing = _read_runtimes(path) + journal_entries = _read_journal(path) + if existing is None: + runtimes = _RuntimeFile( + runtime_version=CURRENT_RUNTIME_VERSION, + task=[], + ) + runtimes = _apply_journal(runtimes, journal_entries) + state = cls(path=path, runtimes=runtimes) + else: + runtimes = _apply_journal(existing, journal_entries) + state = cls(path=path, runtimes=runtimes) + + if journal_entries: + state._dirty = True + return state + + def _rebuild_index(self) -> None: + self._index = {entry.id: entry for entry in self.runtimes.task} + + def update_task(self, task: PTask, start: float, end: float) -> None: + task_id = _build_task_id(task) + entry = _RuntimeEntry(id=task_id, date=start, duration=end - start) + self._index[entry.id] = entry + self.runtimes = _RuntimeFile( + runtime_version=CURRENT_RUNTIME_VERSION, + task=list(self._index.values()), + ) + self._rebuild_index() + journal_entry = _RuntimeJournalEntry( + runtime_version=CURRENT_RUNTIME_VERSION, + id=entry.id, + date=entry.date, + duration=entry.duration, + ) + append_jsonl(_journal_path(self.path), journal_entry) + self._dirty = True + + def get_duration(self, task: PTask) -> float | None: + task_id = _build_task_id(task) + entry = self._index.get(task_id) + if entry is None: + return None + return entry.duration + + def flush(self) -> None: + if not self._dirty: + return + _write_runtimes(self.path, self.runtimes) + delete_if_exists(_journal_path(self.path)) + self._dirty = False diff --git a/src/pytask/__init__.py b/src/pytask/__init__.py index d994b403..4352ccd9 100644 --- a/src/pytask/__init__.py +++ b/src/pytask/__init__.py @@ -66,7 +66,6 @@ from _pytask.pluginmanager import get_plugin_manager from _pytask.pluginmanager import hookimpl from _pytask.pluginmanager import storage -from _pytask.profile import Runtime from _pytask.reports import CollectionReport from _pytask.reports import DagReport from _pytask.reports import ExecutionReport @@ -123,7 +122,6 @@ "PytaskError", "PythonNode", "ResolvingDependenciesError", - "Runtime", "Session", "ShowCapture", "Skipped", diff --git a/tests/test_profile.py b/tests/test_profile.py index 67982cf1..7454d997 100644 --- a/tests/test_profile.py +++ b/tests/test_profile.py @@ -5,12 +5,10 @@ import pytest from _pytask.profile import _to_human_readable_size -from pytask import DatabaseSession +from _pytask.runtime_store import RuntimeState from pytask import ExitCode -from pytask import Runtime from pytask import build from pytask import cli -from pytask import create_database def test_duration_is_stored_in_task(tmp_path): @@ -28,14 +26,10 @@ def task_example(): time.sleep(2) duration = task.attributes["duration"] assert duration[1] - duration[0] > 2 - create_database( - "sqlite:///" + tmp_path.joinpath(".pytask", "pytask.sqlite3").as_posix() - ) - - with DatabaseSession() as session: - runtime = session.get(Runtime, task.signature) - assert runtime is not None - assert runtime.duration > 2 + runtime_state = RuntimeState.from_root(tmp_path) + duration = runtime_state.get_duration(task) + assert duration is not None + assert duration > 2 def test_profile_if_no_tasks_are_collected(tmp_path, runner): diff --git a/tests/test_runtime_store.py b/tests/test_runtime_store.py new file mode 100644 index 00000000..6a6df686 --- /dev/null +++ b/tests/test_runtime_store.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from _pytask.runtime_store import RuntimeState + + +def test_runtime_state_recovers_from_journal(tmp_path): + tmp_path.joinpath(".pytask").mkdir() + task = SimpleNamespace(name="task_example") + + state = RuntimeState.from_root(tmp_path) + state.update_task(task, 1.0, 3.0) + + recovered = RuntimeState.from_root(tmp_path) + assert recovered.get_duration(task) == pytest.approx(2.0) + + +def test_runtime_state_flushes_journal(tmp_path): + tmp_path.joinpath(".pytask").mkdir() + task = SimpleNamespace(name="task_example") + + state = RuntimeState.from_root(tmp_path) + state.update_task(task, 2.0, 5.5) + + journal_path = tmp_path / ".pytask" / "runtimes.journal" + runtimes_path = tmp_path / ".pytask" / "runtimes.json" + assert journal_path.exists() + + state.flush() + + assert not journal_path.exists() + assert runtimes_path.exists() + + reloaded = RuntimeState.from_root(tmp_path) + assert reloaded.get_duration(task) == pytest.approx(3.5) diff --git a/uv.lock b/uv.lock index e91cc8d0..5abc076b 100644 --- a/uv.lock +++ b/uv.lock @@ -1959,6 +1959,62 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b6/bc/8bd826dd03e022153bfa1766dcdec4976d6c818865ed54223d71f07862b3/msgpack-1.1.0-cp313-cp313-win_amd64.whl", hash = "sha256:bce7d9e614a04d0883af0b3d4d501171fbfca038f12c77fa838d9f198147a23f", size = 75140, upload-time = "2024-09-10T04:24:31.288Z" }, ] +[[package]] +name = "msgspec" +version = "0.20.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ea/9c/bfbd12955a49180cbd234c5d29ec6f74fe641698f0cd9df154a854fc8a15/msgspec-0.20.0.tar.gz", hash = "sha256:692349e588fde322875f8d3025ac01689fead5901e7fb18d6870a44519d62a29", size = 317862, upload-time = "2025-11-24T03:56:28.934Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e3/5e/151883ba2047cca9db8ed2f86186b054ad200bc231352df15b0c1dd75b1f/msgspec-0.20.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:23a6ec2a3b5038c233b04740a545856a068bc5cb8db184ff493a58e08c994fbf", size = 195191, upload-time = "2025-11-24T03:55:08.549Z" }, + { url = "https://files.pythonhosted.org/packages/50/88/a795647672f547c983eff0823b82aaa35db922c767e1b3693e2dcf96678d/msgspec-0.20.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cde2c41ed3eaaef6146365cb0d69580078a19f974c6cb8165cc5dcd5734f573e", size = 188513, upload-time = "2025-11-24T03:55:10.008Z" }, + { url = "https://files.pythonhosted.org/packages/4b/91/eb0abb0e0de142066cebfe546dc9140c5972ea824aa6ff507ad0b6a126ac/msgspec-0.20.0-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5da0daa782f95d364f0d95962faed01e218732aa1aa6cad56b25a5d2092e75a4", size = 216370, upload-time = "2025-11-24T03:55:11.566Z" }, + { url = "https://files.pythonhosted.org/packages/15/2a/48e41d9ef0a24b1c6e67cbd94a676799e0561bfbc163be1aaaff5ca853f5/msgspec-0.20.0-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9369d5266144bef91be2940a3821e03e51a93c9080fde3ef72728c3f0a3a8bb7", size = 222653, upload-time = "2025-11-24T03:55:13.159Z" }, + { url = "https://files.pythonhosted.org/packages/90/c9/14b825df203d980f82a623450d5f39e7f7a09e6e256c52b498ea8f29d923/msgspec-0.20.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:90fb865b306ca92c03964a5f3d0cd9eb1adda14f7e5ac7943efd159719ea9f10", size = 222337, upload-time = "2025-11-24T03:55:14.777Z" }, + { url = "https://files.pythonhosted.org/packages/8b/d7/39a5c3ddd294f587d6fb8efccc8361b6aa5089974015054071e665c9d24b/msgspec-0.20.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e8112cd48b67dfc0cfa49fc812b6ce7eb37499e1d95b9575061683f3428975d3", size = 225565, upload-time = "2025-11-24T03:55:16.4Z" }, + { url = "https://files.pythonhosted.org/packages/98/bd/5db3c14d675ee12842afb9b70c94c64f2c873f31198c46cbfcd7dffafab0/msgspec-0.20.0-cp310-cp310-win_amd64.whl", hash = "sha256:666b966d503df5dc27287675f525a56b6e66a2b8e8ccd2877b0c01328f19ae6c", size = 188412, upload-time = "2025-11-24T03:55:17.747Z" }, + { url = "https://files.pythonhosted.org/packages/76/c7/06cc218bc0c86f0c6c6f34f7eeea6cfb8b835070e8031e3b0ef00f6c7c69/msgspec-0.20.0-cp310-cp310-win_arm64.whl", hash = "sha256:099e3e85cd5b238f2669621be65f0728169b8c7cb7ab07f6137b02dc7feea781", size = 173951, upload-time = "2025-11-24T03:55:19.335Z" }, + { url = "https://files.pythonhosted.org/packages/03/59/fdcb3af72f750a8de2bcf39d62ada70b5eb17b06d7f63860e0a679cb656b/msgspec-0.20.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:09e0efbf1ac641fedb1d5496c59507c2f0dc62a052189ee62c763e0aae217520", size = 193345, upload-time = "2025-11-24T03:55:20.613Z" }, + { url = "https://files.pythonhosted.org/packages/5a/15/3c225610da9f02505d37d69a77f4a2e7daae2a125f99d638df211ba84e59/msgspec-0.20.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:23ee3787142e48f5ee746b2909ce1b76e2949fbe0f97f9f6e70879f06c218b54", size = 186867, upload-time = "2025-11-24T03:55:22.4Z" }, + { url = "https://files.pythonhosted.org/packages/81/36/13ab0c547e283bf172f45491edfdea0e2cecb26ae61e3a7b1ae6058b326d/msgspec-0.20.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:81f4ac6f0363407ac0465eff5c7d4d18f26870e00674f8fcb336d898a1e36854", size = 215351, upload-time = "2025-11-24T03:55:23.958Z" }, + { url = "https://files.pythonhosted.org/packages/6b/96/5c095b940de3aa6b43a71ec76275ac3537b21bd45c7499b5a17a429110fa/msgspec-0.20.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bb4d873f24ae18cd1334f4e37a178ed46c9d186437733351267e0a269bdf7e53", size = 219896, upload-time = "2025-11-24T03:55:25.356Z" }, + { url = "https://files.pythonhosted.org/packages/98/7a/81a7b5f01af300761087b114dafa20fb97aed7184d33aab64d48874eb187/msgspec-0.20.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:b92b8334427b8393b520c24ff53b70f326f79acf5f74adb94fd361bcff8a1d4e", size = 220389, upload-time = "2025-11-24T03:55:26.99Z" }, + { url = "https://files.pythonhosted.org/packages/70/c0/3d0cce27db9a9912421273d49eab79ce01ecd2fed1a2f1b74af9b445f33c/msgspec-0.20.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:562c44b047c05cc0384e006fae7a5e715740215c799429e0d7e3e5adf324285a", size = 223348, upload-time = "2025-11-24T03:55:28.311Z" }, + { url = "https://files.pythonhosted.org/packages/89/5e/406b7d578926b68790e390d83a1165a9bfc2d95612a1a9c1c4d5c72ea815/msgspec-0.20.0-cp311-cp311-win_amd64.whl", hash = "sha256:d1dcc93a3ce3d3195985bfff18a48274d0b5ffbc96fa1c5b89da6f0d9af81b29", size = 188713, upload-time = "2025-11-24T03:55:29.553Z" }, + { url = "https://files.pythonhosted.org/packages/47/87/14fe2316624ceedf76a9e94d714d194cbcb699720b210ff189f89ca4efd7/msgspec-0.20.0-cp311-cp311-win_arm64.whl", hash = "sha256:aa387aa330d2e4bd69995f66ea8fdc87099ddeedf6fdb232993c6a67711e7520", size = 174229, upload-time = "2025-11-24T03:55:31.107Z" }, + { url = "https://files.pythonhosted.org/packages/d9/6f/1e25eee957e58e3afb2a44b94fa95e06cebc4c236193ed0de3012fff1e19/msgspec-0.20.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:2aba22e2e302e9231e85edc24f27ba1f524d43c223ef5765bd8624c7df9ec0a5", size = 196391, upload-time = "2025-11-24T03:55:32.677Z" }, + { url = "https://files.pythonhosted.org/packages/7f/ee/af51d090ada641d4b264992a486435ba3ef5b5634bc27e6eb002f71cef7d/msgspec-0.20.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:716284f898ab2547fedd72a93bb940375de9fbfe77538f05779632dc34afdfde", size = 188644, upload-time = "2025-11-24T03:55:33.934Z" }, + { url = "https://files.pythonhosted.org/packages/49/d6/9709ee093b7742362c2934bfb1bbe791a1e09bed3ea5d8a18ce552fbfd73/msgspec-0.20.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:558ed73315efa51b1538fa8f1d3b22c8c5ff6d9a2a62eff87d25829b94fc5054", size = 218852, upload-time = "2025-11-24T03:55:35.575Z" }, + { url = "https://files.pythonhosted.org/packages/5c/a2/488517a43ccf5a4b6b6eca6dd4ede0bd82b043d1539dd6bb908a19f8efd3/msgspec-0.20.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:509ac1362a1d53aa66798c9b9fd76872d7faa30fcf89b2fba3bcbfd559d56eb0", size = 224937, upload-time = "2025-11-24T03:55:36.859Z" }, + { url = "https://files.pythonhosted.org/packages/d5/e8/49b832808aa23b85d4f090d1d2e48a4e3834871415031ed7c5fe48723156/msgspec-0.20.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:1353c2c93423602e7dea1aa4c92f3391fdfc25ff40e0bacf81d34dbc68adb870", size = 222858, upload-time = "2025-11-24T03:55:38.187Z" }, + { url = "https://files.pythonhosted.org/packages/9f/56/1dc2fa53685dca9c3f243a6cbecd34e856858354e455b77f47ebd76cf5bf/msgspec-0.20.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:cb33b5eb5adb3c33d749684471c6a165468395d7aa02d8867c15103b81e1da3e", size = 227248, upload-time = "2025-11-24T03:55:39.496Z" }, + { url = "https://files.pythonhosted.org/packages/5a/51/aba940212c23b32eedce752896205912c2668472ed5b205fc33da28a6509/msgspec-0.20.0-cp312-cp312-win_amd64.whl", hash = "sha256:fb1d934e435dd3a2b8cf4bbf47a8757100b4a1cfdc2afdf227541199885cdacb", size = 190024, upload-time = "2025-11-24T03:55:40.829Z" }, + { url = "https://files.pythonhosted.org/packages/41/ad/3b9f259d94f183daa9764fef33fdc7010f7ecffc29af977044fa47440a83/msgspec-0.20.0-cp312-cp312-win_arm64.whl", hash = "sha256:00648b1e19cf01b2be45444ba9dc961bd4c056ffb15706651e64e5d6ec6197b7", size = 175390, upload-time = "2025-11-24T03:55:42.05Z" }, + { url = "https://files.pythonhosted.org/packages/8a/d1/b902d38b6e5ba3bdddbec469bba388d647f960aeed7b5b3623a8debe8a76/msgspec-0.20.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9c1ff8db03be7598b50dd4b4a478d6fe93faae3bd54f4f17aa004d0e46c14c46", size = 196463, upload-time = "2025-11-24T03:55:43.405Z" }, + { url = "https://files.pythonhosted.org/packages/57/b6/eff0305961a1d9447ec2b02f8c73c8946f22564d302a504185b730c9a761/msgspec-0.20.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f6532369ece217fd37c5ebcfd7e981f2615628c21121b7b2df9d3adcf2fd69b8", size = 188650, upload-time = "2025-11-24T03:55:44.761Z" }, + { url = "https://files.pythonhosted.org/packages/99/93/f2ec1ae1de51d3fdee998a1ede6b2c089453a2ee82b5c1b361ed9095064a/msgspec-0.20.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f9a1697da2f85a751ac3cc6a97fceb8e937fc670947183fb2268edaf4016d1ee", size = 218834, upload-time = "2025-11-24T03:55:46.441Z" }, + { url = "https://files.pythonhosted.org/packages/28/83/36557b04cfdc317ed8a525c4993b23e43a8fbcddaddd78619112ca07138c/msgspec-0.20.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:7fac7e9c92eddcd24c19d9e5f6249760941485dff97802461ae7c995a2450111", size = 224917, upload-time = "2025-11-24T03:55:48.06Z" }, + { url = "https://files.pythonhosted.org/packages/8f/56/362037a1ed5be0b88aced59272442c4b40065c659700f4b195a7f4d0ac88/msgspec-0.20.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f953a66f2a3eb8d5ea64768445e2bb301d97609db052628c3e1bcb7d87192a9f", size = 222821, upload-time = "2025-11-24T03:55:49.388Z" }, + { url = "https://files.pythonhosted.org/packages/92/75/fa2370ec341cedf663731ab7042e177b3742645c5dd4f64dc96bd9f18a6b/msgspec-0.20.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:247af0313ae64a066d3aea7ba98840f6681ccbf5c90ba9c7d17f3e39dbba679c", size = 227227, upload-time = "2025-11-24T03:55:51.125Z" }, + { url = "https://files.pythonhosted.org/packages/f1/25/5e8080fe0117f799b1b68008dc29a65862077296b92550632de015128579/msgspec-0.20.0-cp313-cp313-win_amd64.whl", hash = "sha256:67d5e4dfad52832017018d30a462604c80561aa62a9d548fc2bd4e430b66a352", size = 189966, upload-time = "2025-11-24T03:55:52.458Z" }, + { url = "https://files.pythonhosted.org/packages/79/b6/63363422153937d40e1cb349c5081338401f8529a5a4e216865decd981bf/msgspec-0.20.0-cp313-cp313-win_arm64.whl", hash = "sha256:91a52578226708b63a9a13de287b1ec3ed1123e4a088b198143860c087770458", size = 175378, upload-time = "2025-11-24T03:55:53.721Z" }, + { url = "https://files.pythonhosted.org/packages/bb/18/62dc13ab0260c7d741dda8dc7f481495b93ac9168cd887dda5929880eef8/msgspec-0.20.0-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:eead16538db1b3f7ec6e3ed1f6f7c5dec67e90f76e76b610e1ffb5671815633a", size = 196407, upload-time = "2025-11-24T03:55:55.001Z" }, + { url = "https://files.pythonhosted.org/packages/dd/1d/b9949e4ad6953e9f9a142c7997b2f7390c81e03e93570c7c33caf65d27e1/msgspec-0.20.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:703c3bb47bf47801627fb1438f106adbfa2998fe586696d1324586a375fca238", size = 188889, upload-time = "2025-11-24T03:55:56.311Z" }, + { url = "https://files.pythonhosted.org/packages/1e/19/f8bb2dc0f1bfe46cc7d2b6b61c5e9b5a46c62298e8f4d03bbe499c926180/msgspec-0.20.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6cdb227dc585fb109305cee0fd304c2896f02af93ecf50a9c84ee54ee67dbb42", size = 219691, upload-time = "2025-11-24T03:55:57.908Z" }, + { url = "https://files.pythonhosted.org/packages/b8/8e/6b17e43f6eb9369d9858ee32c97959fcd515628a1df376af96c11606cf70/msgspec-0.20.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:27d35044dd8818ac1bd0fedb2feb4fbdff4e3508dd7c5d14316a12a2d96a0de0", size = 224918, upload-time = "2025-11-24T03:55:59.322Z" }, + { url = "https://files.pythonhosted.org/packages/1c/db/0e833a177db1a4484797adba7f429d4242585980b90882cc38709e1b62df/msgspec-0.20.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:b4296393a29ee42dd25947981c65506fd4ad39beaf816f614146fa0c5a6c91ae", size = 223436, upload-time = "2025-11-24T03:56:00.716Z" }, + { url = "https://files.pythonhosted.org/packages/c3/30/d2ee787f4c918fd2b123441d49a7707ae9015e0e8e1ab51aa7967a97b90e/msgspec-0.20.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:205fbdadd0d8d861d71c8f3399fe1a82a2caf4467bc8ff9a626df34c12176980", size = 227190, upload-time = "2025-11-24T03:56:02.371Z" }, + { url = "https://files.pythonhosted.org/packages/ff/37/9c4b58ff11d890d788e700b827db2366f4d11b3313bf136780da7017278b/msgspec-0.20.0-cp314-cp314-win_amd64.whl", hash = "sha256:7dfebc94fe7d3feec6bc6c9df4f7e9eccc1160bb5b811fbf3e3a56899e398a6b", size = 193950, upload-time = "2025-11-24T03:56:03.668Z" }, + { url = "https://files.pythonhosted.org/packages/e9/4e/cab707bf2fa57408e2934e5197fc3560079db34a1e3cd2675ff2e47e07de/msgspec-0.20.0-cp314-cp314-win_arm64.whl", hash = "sha256:2ad6ae36e4a602b24b4bf4eaf8ab5a441fec03e1f1b5931beca8ebda68f53fc0", size = 179018, upload-time = "2025-11-24T03:56:05.038Z" }, + { url = "https://files.pythonhosted.org/packages/4c/06/3da3fc9aaa55618a8f43eb9052453cfe01f82930bca3af8cea63a89f3a11/msgspec-0.20.0-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:f84703e0e6ef025663dd1de828ca028774797b8155e070e795c548f76dde65d5", size = 200389, upload-time = "2025-11-24T03:56:06.375Z" }, + { url = "https://files.pythonhosted.org/packages/83/3b/cc4270a5ceab40dfe1d1745856951b0a24fd16ac8539a66ed3004a60c91e/msgspec-0.20.0-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:7c83fc24dd09cf1275934ff300e3951b3adc5573f0657a643515cc16c7dee131", size = 193198, upload-time = "2025-11-24T03:56:07.742Z" }, + { url = "https://files.pythonhosted.org/packages/cd/ae/4c7905ac53830c8e3c06fdd60e3cdcfedc0bbc993872d1549b84ea21a1bd/msgspec-0.20.0-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:5f13ccb1c335a124e80c4562573b9b90f01ea9521a1a87f7576c2e281d547f56", size = 225973, upload-time = "2025-11-24T03:56:09.18Z" }, + { url = "https://files.pythonhosted.org/packages/d9/da/032abac1de4d0678d99eaeadb1323bd9d247f4711c012404ba77ed6f15ca/msgspec-0.20.0-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:17c2b5ca19f19306fc83c96d85e606d2cc107e0caeea85066b5389f664e04846", size = 229509, upload-time = "2025-11-24T03:56:10.898Z" }, + { url = "https://files.pythonhosted.org/packages/69/52/fdc7bdb7057a166f309e0b44929e584319e625aaba4771b60912a9321ccd/msgspec-0.20.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:d931709355edabf66c2dd1a756b2d658593e79882bc81aae5964969d5a291b63", size = 230434, upload-time = "2025-11-24T03:56:12.48Z" }, + { url = "https://files.pythonhosted.org/packages/cb/fe/1dfd5f512b26b53043884e4f34710c73e294e7cc54278c3fe28380e42c37/msgspec-0.20.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:565f915d2e540e8a0c93a01ff67f50aebe1f7e22798c6a25873f9fda8d1325f8", size = 231758, upload-time = "2025-11-24T03:56:13.765Z" }, + { url = "https://files.pythonhosted.org/packages/97/f6/9ba7121b8e0c4e0beee49575d1dbc804e2e72467692f0428cf39ceba1ea5/msgspec-0.20.0-cp314-cp314t-win_amd64.whl", hash = "sha256:726f3e6c3c323f283f6021ebb6c8ccf58d7cd7baa67b93d73bfbe9a15c34ab8d", size = 206540, upload-time = "2025-11-24T03:56:15.029Z" }, + { url = "https://files.pythonhosted.org/packages/c8/3e/c5187de84bb2c2ca334ab163fcacf19a23ebb1d876c837f81a1b324a15bf/msgspec-0.20.0-cp314-cp314t-win_arm64.whl", hash = "sha256:93f23528edc51d9f686808a361728e903d6f2be55c901d6f5c92e44c6d546bfc", size = 183011, upload-time = "2025-11-24T03:56:16.442Z" }, +] + [[package]] name = "multidict" version = "6.4.4" @@ -2742,6 +2798,7 @@ dependencies = [ { name = "attrs" }, { name = "click" }, { name = "click-default-group" }, + { name = "msgspec" }, { name = "networkx", version = "3.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "networkx", version = "3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "optree" }, @@ -2801,6 +2858,7 @@ requires-dist = [ { name = "attrs", specifier = ">=21.3.0" }, { name = "click", specifier = ">=8.1.8,!=8.2.0" }, { name = "click-default-group", specifier = ">=1.2.4" }, + { name = "msgspec", specifier = ">=0.18.6" }, { name = "networkx", specifier = ">=2.4.0" }, { name = "optree", specifier = ">=0.9.0" }, { name = "packaging", specifier = ">=23.0.0" }, From 40630c6dd126836a9955942f87b46047166cbee7 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 12:33:22 +0100 Subject: [PATCH 02/10] Refactor runtime journal helper --- src/_pytask/journal.py | 59 ++++++++++++++++++++---------------- src/_pytask/runtime_store.py | 39 +++++++++++++----------- 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/src/_pytask/journal.py b/src/_pytask/journal.py index bc48066d..5fcf61c8 100644 --- a/src/_pytask/journal.py +++ b/src/_pytask/journal.py @@ -2,7 +2,9 @@ from __future__ import annotations +from dataclasses import dataclass from typing import TYPE_CHECKING +from typing import Generic from typing import TypeVar import msgspec @@ -13,29 +15,34 @@ T = TypeVar("T") -def append_jsonl(path: Path, payload: msgspec.Struct) -> None: - """Append a JSON line to the journal.""" - with path.open("ab") as journal_file: - journal_file.write(msgspec.json.encode(payload) + b"\n") - - -def read_jsonl(path: Path, *, type_: type[T]) -> list[T]: - """Read JSONL entries from a journal, stopping at the first invalid line.""" - if not path.exists(): - return [] - - entries: list[T] = [] - for line in path.read_bytes().splitlines(): - if not line.strip(): - continue - try: - entries.append(msgspec.json.decode(line, type=type_)) - except msgspec.DecodeError: - break - return entries - - -def delete_if_exists(path: Path) -> None: - """Delete a file if it exists.""" - if path.exists(): - path.unlink() +@dataclass(frozen=True) +class JsonlJournal(Generic[T]): + """Append-only JSONL journal with best-effort recovery.""" + + path: Path + type_: type[T] + + def append(self, payload: msgspec.Struct) -> None: + """Append a JSON line to the journal.""" + with self.path.open("ab") as journal_file: + journal_file.write(msgspec.json.encode(payload) + b"\n") + + def read(self) -> list[T]: + """Read entries, stopping at the first invalid line.""" + if not self.path.exists(): + return [] + + entries: list[T] = [] + for line in self.path.read_bytes().splitlines(): + if not line.strip(): + continue + try: + entries.append(msgspec.json.decode(line, type=self.type_)) + except msgspec.DecodeError: + break + return entries + + def delete(self) -> None: + """Delete the journal if it exists.""" + if self.path.exists(): + self.path.unlink() diff --git a/src/_pytask/runtime_store.py b/src/_pytask/runtime_store.py index e4f9b7ed..646ac1a1 100644 --- a/src/_pytask/runtime_store.py +++ b/src/_pytask/runtime_store.py @@ -9,9 +9,7 @@ import msgspec from packaging.version import Version -from _pytask.journal import append_jsonl -from _pytask.journal import delete_if_exists -from _pytask.journal import read_jsonl +from _pytask.journal import JsonlJournal if TYPE_CHECKING: from pathlib import Path @@ -55,6 +53,10 @@ def _journal_path(path: Path) -> Path: return path.with_suffix(".journal") +def _journal(path: Path) -> JsonlJournal[_RuntimeJournalEntry]: + return JsonlJournal(path=_journal_path(path), type_=_RuntimeJournalEntry) + + def _read_runtimes(path: Path) -> _RuntimeFile | None: if not path.exists(): return None @@ -80,9 +82,10 @@ def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None: tmp.replace(path) -def _read_journal(path: Path) -> list[_RuntimeJournalEntry]: - journal_path = _journal_path(path) - entries = read_jsonl(journal_path, type_=_RuntimeJournalEntry) +def _read_journal( + journal: JsonlJournal[_RuntimeJournalEntry], +) -> list[_RuntimeJournalEntry]: + entries = journal.read() for entry in entries: if Version(entry.runtime_version) != Version(CURRENT_RUNTIME_VERSION): msg = ( @@ -109,14 +112,11 @@ def _apply_journal( ) -def _build_task_id(task: PTask) -> str: - return task.name - - @dataclass class RuntimeState: path: Path runtimes: _RuntimeFile + journal: JsonlJournal[_RuntimeJournalEntry] _index: dict[str, _RuntimeEntry] = field(init=False, default_factory=dict) _dirty: bool = field(init=False, default=False) @@ -126,18 +126,19 @@ def __post_init__(self) -> None: @classmethod def from_root(cls, root: Path) -> RuntimeState: path = _runtimes_path(root) + journal = _journal(path) existing = _read_runtimes(path) - journal_entries = _read_journal(path) + journal_entries = _read_journal(journal) if existing is None: runtimes = _RuntimeFile( runtime_version=CURRENT_RUNTIME_VERSION, task=[], ) runtimes = _apply_journal(runtimes, journal_entries) - state = cls(path=path, runtimes=runtimes) + state = cls(path=path, runtimes=runtimes, journal=journal) else: runtimes = _apply_journal(existing, journal_entries) - state = cls(path=path, runtimes=runtimes) + state = cls(path=path, runtimes=runtimes, journal=journal) if journal_entries: state._dirty = True @@ -146,8 +147,12 @@ def from_root(cls, root: Path) -> RuntimeState: def _rebuild_index(self) -> None: self._index = {entry.id: entry for entry in self.runtimes.task} + @staticmethod + def _task_id(task: PTask) -> str: + return task.name + def update_task(self, task: PTask, start: float, end: float) -> None: - task_id = _build_task_id(task) + task_id = self._task_id(task) entry = _RuntimeEntry(id=task_id, date=start, duration=end - start) self._index[entry.id] = entry self.runtimes = _RuntimeFile( @@ -161,11 +166,11 @@ def update_task(self, task: PTask, start: float, end: float) -> None: date=entry.date, duration=entry.duration, ) - append_jsonl(_journal_path(self.path), journal_entry) + self.journal.append(journal_entry) self._dirty = True def get_duration(self, task: PTask) -> float | None: - task_id = _build_task_id(task) + task_id = self._task_id(task) entry = self._index.get(task_id) if entry is None: return None @@ -175,5 +180,5 @@ def flush(self) -> None: if not self._dirty: return _write_runtimes(self.path, self.runtimes) - delete_if_exists(_journal_path(self.path)) + self.journal.delete() self._dirty = False From bbf88f23a95c35067cb3a4c7af56a48a8f1ca482 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 12:52:12 +0100 Subject: [PATCH 03/10] Refactor profile runtime plugin --- src/_pytask/profile.py | 95 +++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 52 deletions(-) diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index dffd3edc..e0e7c5f7 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -54,9 +54,8 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None: @hookimpl def pytask_post_parse(config: dict[str, Any]) -> None: """Register the export option.""" - config["runtime_state"] = RuntimeState.from_root(config["root"]) + config["pm"].register(ProfilePlugin(RuntimeState.from_root(config["root"]))) config["pm"].register(ExportNameSpace) - config["pm"].register(DurationNameSpace) config["pm"].register(FileSizeNameSpace) @@ -70,17 +69,48 @@ def pytask_execute_task(task: PTask) -> Generator[None, None, None]: return result -@hookimpl -def pytask_execute_task_process_report( - session: Session, report: ExecutionReport -) -> None: - """Store runtime of successfully finishing tasks.""" - task = report.task - duration = task.attributes.get("duration") - if report.outcome == TaskOutcome.SUCCESS and duration is not None: - runtime_state = session.config.get("runtime_state") - if runtime_state is not None: - runtime_state.update_task(task, *duration) +class ProfilePlugin: + """Collect and persist runtime profiling data.""" + + def __init__(self, runtime_state: RuntimeState) -> None: + self.runtime_state = runtime_state + + @hookimpl + def pytask_execute_task_process_report( + self, session: Session, report: ExecutionReport + ) -> None: + """Store runtime of successfully finishing tasks.""" + _ = session + task = report.task + duration = task.attributes.get("duration") + if report.outcome == TaskOutcome.SUCCESS and duration is not None: + self.runtime_state.update_task(task, *duration) + + @hookimpl + def pytask_profile_add_info_on_task( + self, session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]] + ) -> None: + """Add the runtime for tasks to the profile.""" + _ = session + for name, duration in self._collect_runtimes(tasks).items(): + profile[name]["Duration (in s)"] = round(duration, 2) + + @hookimpl + def pytask_unconfigure(self, session: Session) -> None: + """Flush runtime information on normal build exits.""" + if session.config.get("command") != "build": + return + if session.config.get("dry_run") or session.config.get("explain"): + return + self.runtime_state.flush() + + def _collect_runtimes(self, tasks: list[PTask]) -> dict[str, float]: + """Collect runtimes.""" + return { + task.name: duration + for task in tasks + if (duration := self.runtime_state.get_duration(task)) is not None + } @click.command(cls=ColoredCommand) @@ -161,32 +191,6 @@ def _print_profile_table( console.print("No information is stored on the collected tasks.") -class DurationNameSpace: - """A namespace for adding durations to the profile.""" - - @staticmethod - @hookimpl - def pytask_profile_add_info_on_task( - session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]] - ) -> None: - """Add the runtime for tasks to the profile.""" - runtimes = _collect_runtimes(session, tasks) - for name, duration in runtimes.items(): - profile[name]["Duration (in s)"] = round(duration, 2) - - -def _collect_runtimes(session: Session, tasks: list[PTask]) -> dict[str, float]: - """Collect runtimes.""" - runtime_state = session.config.get("runtime_state") - if runtime_state is None: - return {} - return { - task.name: duration - for task in tasks - if (duration := runtime_state.get_duration(task)) is not None - } - - class FileSizeNameSpace: """A namespace for adding the total file size of products to a task.""" @@ -294,16 +298,3 @@ def _get_info_names(profile: dict[str, dict[str, Any]]) -> list[str]: base: set[str] = set() info_names: list[str] = sorted(base.union(*(set(val) for val in profile.values()))) return info_names - - -@hookimpl -def pytask_unconfigure(session: Session) -> None: - """Flush runtime information on normal build exits.""" - if session.config.get("command") != "build": - return - if session.config.get("dry_run") or session.config.get("explain"): - return - runtime_state = session.config.get("runtime_state") - if runtime_state is None: - return - runtime_state.flush() From ae329df9272fb96ff10990f79f788651cbfcc5e9 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 13:07:11 +0100 Subject: [PATCH 04/10] Restore duration plugin and ty config --- justfile | 2 +- pyproject.toml | 1 + src/_pytask/profile.py | 37 ++++++++++++++++++++----------------- tests/test_runtime_store.py | 28 +++++++++++++++++++++++++--- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/justfile b/justfile index 4cfdf4f5..89eb0aa0 100644 --- a/justfile +++ b/justfile @@ -12,7 +12,7 @@ test-cov *FLAGS: # Run type checking typing: - uv run --group typing --group test --isolated ty check src/ tests/ + uv run --group typing --group test --isolated ty check # Run linting lint: diff --git a/pyproject.toml b/pyproject.toml index 82dde20d..b192306a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -179,6 +179,7 @@ include = [ unused-ignore-comment = "ignore" [tool.ty.src] +include = ["src", "tests"] exclude = ["src/_pytask/_hashlib.py"] [tool.ty.terminal] diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index e0e7c5f7..6a39b88f 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -54,7 +54,9 @@ def pytask_extend_command_line_interface(cli: click.Group) -> None: @hookimpl def pytask_post_parse(config: dict[str, Any]) -> None: """Register the export option.""" - config["pm"].register(ProfilePlugin(RuntimeState.from_root(config["root"]))) + runtime_state = RuntimeState.from_root(config["root"]) + config["pm"].register(ProfilePlugin(runtime_state)) + config["pm"].register(DurationNameSpace(runtime_state)) config["pm"].register(ExportNameSpace) config["pm"].register(FileSizeNameSpace) @@ -86,15 +88,6 @@ def pytask_execute_task_process_report( if report.outcome == TaskOutcome.SUCCESS and duration is not None: self.runtime_state.update_task(task, *duration) - @hookimpl - def pytask_profile_add_info_on_task( - self, session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]] - ) -> None: - """Add the runtime for tasks to the profile.""" - _ = session - for name, duration in self._collect_runtimes(tasks).items(): - profile[name]["Duration (in s)"] = round(duration, 2) - @hookimpl def pytask_unconfigure(self, session: Session) -> None: """Flush runtime information on normal build exits.""" @@ -104,13 +97,23 @@ def pytask_unconfigure(self, session: Session) -> None: return self.runtime_state.flush() - def _collect_runtimes(self, tasks: list[PTask]) -> dict[str, float]: - """Collect runtimes.""" - return { - task.name: duration - for task in tasks - if (duration := self.runtime_state.get_duration(task)) is not None - } + +class DurationNameSpace: + """A namespace for adding durations to the profile.""" + + def __init__(self, runtime_state: RuntimeState) -> None: + self.runtime_state = runtime_state + + @hookimpl + def pytask_profile_add_info_on_task( + self, session: Session, tasks: list[PTask], profile: dict[str, dict[str, Any]] + ) -> None: + """Add the runtime for tasks to the profile.""" + _ = session + for task in tasks: + duration = self.runtime_state.get_duration(task) + if duration is not None: + profile[task.name]["Duration (in s)"] = round(duration, 2) @click.command(cls=ColoredCommand) diff --git a/tests/test_runtime_store.py b/tests/test_runtime_store.py index 6a6df686..8fc77521 100644 --- a/tests/test_runtime_store.py +++ b/tests/test_runtime_store.py @@ -1,15 +1,37 @@ from __future__ import annotations -from types import SimpleNamespace +from typing import Any import pytest from _pytask.runtime_store import RuntimeState +class DummyTask: + def __init__(self, name: str) -> None: + self.name = name + self.depends_on = {} + self.produces = {} + self.function = lambda: None + self.markers = [] + self.report_sections = [] + self.attributes = {} + + @property + def signature(self) -> str: + return self.name + + def state(self) -> str | None: + return None + + def execute(self, **kwargs: Any) -> Any: + _ = kwargs + return None + + def test_runtime_state_recovers_from_journal(tmp_path): tmp_path.joinpath(".pytask").mkdir() - task = SimpleNamespace(name="task_example") + task = DummyTask(name="task_example") state = RuntimeState.from_root(tmp_path) state.update_task(task, 1.0, 3.0) @@ -20,7 +42,7 @@ def test_runtime_state_recovers_from_journal(tmp_path): def test_runtime_state_flushes_journal(tmp_path): tmp_path.joinpath(".pytask").mkdir() - task = SimpleNamespace(name="task_example") + task = DummyTask(name="task_example") state = RuntimeState.from_root(tmp_path) state.update_task(task, 2.0, 5.5) From efae90710597edf2ceccf76b4b4fe5f0d2ee7909 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 13:14:23 +0100 Subject: [PATCH 05/10] Register profile plugin instance --- src/_pytask/pluginmanager.py | 3 +- src/_pytask/profile.py | 64 ++++++++++++++++++++---------------- 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/src/_pytask/pluginmanager.py b/src/_pytask/pluginmanager.py index 2a7ef4a6..9aa36ce2 100644 --- a/src/_pytask/pluginmanager.py +++ b/src/_pytask/pluginmanager.py @@ -11,6 +11,7 @@ from pluggy import PluginManager from _pytask import hookspecs +from _pytask.profile import ProfilePlugin if TYPE_CHECKING: from collections.abc import Iterable @@ -58,12 +59,12 @@ def pytask_add_hooks(pm: PluginManager) -> None: "_pytask.nodes", "_pytask.parameters", "_pytask.persist", - "_pytask.profile", "_pytask.skipping", "_pytask.task", "_pytask.warnings", ) register_hook_impls_from_modules(pm, builtin_hook_impl_modules) + pm.register(ProfilePlugin()) def get_plugin_manager() -> PluginManager: diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index 6a39b88f..45beb7f4 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -8,6 +8,7 @@ import sys import time from contextlib import suppress +from dataclasses import dataclass from typing import TYPE_CHECKING from typing import Any @@ -32,6 +33,7 @@ from _pytask.traceback import Traceback if TYPE_CHECKING: + from collections.abc import Callable from collections.abc import Generator from pathlib import Path from typing import NoReturn @@ -45,37 +47,35 @@ class _ExportFormats(enum.Enum): CSV = "csv" -@hookimpl(tryfirst=True) -def pytask_extend_command_line_interface(cli: click.Group) -> None: - """Extend the command line interface.""" - cli.add_command(profile) - - -@hookimpl -def pytask_post_parse(config: dict[str, Any]) -> None: - """Register the export option.""" - runtime_state = RuntimeState.from_root(config["root"]) - config["pm"].register(ProfilePlugin(runtime_state)) - config["pm"].register(DurationNameSpace(runtime_state)) - config["pm"].register(ExportNameSpace) - config["pm"].register(FileSizeNameSpace) - - -@hookimpl(wrapper=True) -def pytask_execute_task(task: PTask) -> Generator[None, None, None]: - """Attach the duration of the execution to the task.""" - start = time.time() - result = yield - end = time.time() - task.attributes["duration"] = (start, end) - return result - - +@dataclass class ProfilePlugin: """Collect and persist runtime profiling data.""" - def __init__(self, runtime_state: RuntimeState) -> None: + runtime_state: RuntimeState | None = None + runtime_state_factory: Callable[[Path], RuntimeState] = RuntimeState.from_root + + @hookimpl(tryfirst=True) + def pytask_extend_command_line_interface(self, cli: click.Group) -> None: + """Extend the command line interface.""" + cli.add_command(profile) + + @hookimpl + def pytask_post_parse(self, config: dict[str, Any]) -> None: + """Register the export option.""" + runtime_state = self.runtime_state_factory(config["root"]) self.runtime_state = runtime_state + config["pm"].register(DurationNameSpace(runtime_state)) + config["pm"].register(ExportNameSpace) + config["pm"].register(FileSizeNameSpace) + + @hookimpl(wrapper=True) + def pytask_execute_task(self, task: PTask) -> Generator[None, None, None]: + """Attach the duration of the execution to the task.""" + start = time.time() + result = yield + end = time.time() + task.attributes["duration"] = (start, end) + return result @hookimpl def pytask_execute_task_process_report( @@ -83,19 +83,25 @@ def pytask_execute_task_process_report( ) -> None: """Store runtime of successfully finishing tasks.""" _ = session + runtime_state = self.runtime_state + if runtime_state is None: + return task = report.task duration = task.attributes.get("duration") if report.outcome == TaskOutcome.SUCCESS and duration is not None: - self.runtime_state.update_task(task, *duration) + runtime_state.update_task(task, *duration) @hookimpl def pytask_unconfigure(self, session: Session) -> None: """Flush runtime information on normal build exits.""" + runtime_state = self.runtime_state + if runtime_state is None: + return if session.config.get("command") != "build": return if session.config.get("dry_run") or session.config.get("explain"): return - self.runtime_state.flush() + runtime_state.flush() class DurationNameSpace: From a43c2e26394f908903f09de81fdb42dd6d946020 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 13:22:36 +0100 Subject: [PATCH 06/10] Restore module profile hooks --- src/_pytask/pluginmanager.py | 3 +- src/_pytask/profile.py | 63 +++++++++++++++++------------------- 2 files changed, 30 insertions(+), 36 deletions(-) diff --git a/src/_pytask/pluginmanager.py b/src/_pytask/pluginmanager.py index 9aa36ce2..2a7ef4a6 100644 --- a/src/_pytask/pluginmanager.py +++ b/src/_pytask/pluginmanager.py @@ -11,7 +11,6 @@ from pluggy import PluginManager from _pytask import hookspecs -from _pytask.profile import ProfilePlugin if TYPE_CHECKING: from collections.abc import Iterable @@ -59,12 +58,12 @@ def pytask_add_hooks(pm: PluginManager) -> None: "_pytask.nodes", "_pytask.parameters", "_pytask.persist", + "_pytask.profile", "_pytask.skipping", "_pytask.task", "_pytask.warnings", ) register_hook_impls_from_modules(pm, builtin_hook_impl_modules) - pm.register(ProfilePlugin()) def get_plugin_manager() -> PluginManager: diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index 45beb7f4..95360deb 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -33,7 +33,6 @@ from _pytask.traceback import Traceback if TYPE_CHECKING: - from collections.abc import Callable from collections.abc import Generator from pathlib import Path from typing import NoReturn @@ -47,35 +46,37 @@ class _ExportFormats(enum.Enum): CSV = "csv" +@hookimpl(tryfirst=True) +def pytask_extend_command_line_interface(cli: click.Group) -> None: + """Extend the command line interface.""" + cli.add_command(profile) + + +@hookimpl +def pytask_post_parse(config: dict[str, Any]) -> None: + """Register the export option.""" + runtime_state = RuntimeState.from_root(config["root"]) + config["pm"].register(ProfilePlugin(runtime_state)) + config["pm"].register(DurationNameSpace(runtime_state)) + config["pm"].register(ExportNameSpace) + config["pm"].register(FileSizeNameSpace) + + +@hookimpl(wrapper=True) +def pytask_execute_task(task: PTask) -> Generator[None, None, None]: + """Attach the duration of the execution to the task.""" + start = time.time() + result = yield + end = time.time() + task.attributes["duration"] = (start, end) + return result + + @dataclass class ProfilePlugin: """Collect and persist runtime profiling data.""" - runtime_state: RuntimeState | None = None - runtime_state_factory: Callable[[Path], RuntimeState] = RuntimeState.from_root - - @hookimpl(tryfirst=True) - def pytask_extend_command_line_interface(self, cli: click.Group) -> None: - """Extend the command line interface.""" - cli.add_command(profile) - - @hookimpl - def pytask_post_parse(self, config: dict[str, Any]) -> None: - """Register the export option.""" - runtime_state = self.runtime_state_factory(config["root"]) - self.runtime_state = runtime_state - config["pm"].register(DurationNameSpace(runtime_state)) - config["pm"].register(ExportNameSpace) - config["pm"].register(FileSizeNameSpace) - - @hookimpl(wrapper=True) - def pytask_execute_task(self, task: PTask) -> Generator[None, None, None]: - """Attach the duration of the execution to the task.""" - start = time.time() - result = yield - end = time.time() - task.attributes["duration"] = (start, end) - return result + runtime_state: RuntimeState @hookimpl def pytask_execute_task_process_report( @@ -83,25 +84,19 @@ def pytask_execute_task_process_report( ) -> None: """Store runtime of successfully finishing tasks.""" _ = session - runtime_state = self.runtime_state - if runtime_state is None: - return task = report.task duration = task.attributes.get("duration") if report.outcome == TaskOutcome.SUCCESS and duration is not None: - runtime_state.update_task(task, *duration) + self.runtime_state.update_task(task, *duration) @hookimpl def pytask_unconfigure(self, session: Session) -> None: """Flush runtime information on normal build exits.""" - runtime_state = self.runtime_state - if runtime_state is None: - return if session.config.get("command") != "build": return if session.config.get("dry_run") or session.config.get("explain"): return - runtime_state.flush() + self.runtime_state.flush() class DurationNameSpace: From 546613292abe6ab287bffee17225c6c96038c530 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 13:36:45 +0100 Subject: [PATCH 07/10] Simplify runtime journal recovery --- src/_pytask/journal.py | 5 +++-- src/_pytask/runtime_store.py | 39 +++++++++--------------------------- 2 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/_pytask/journal.py b/src/_pytask/journal.py index 5fcf61c8..13c5776e 100644 --- a/src/_pytask/journal.py +++ b/src/_pytask/journal.py @@ -28,7 +28,7 @@ def append(self, payload: msgspec.Struct) -> None: journal_file.write(msgspec.json.encode(payload) + b"\n") def read(self) -> list[T]: - """Read entries, stopping at the first invalid line.""" + """Read entries, clearing the journal on decode errors.""" if not self.path.exists(): return [] @@ -39,7 +39,8 @@ def read(self) -> list[T]: try: entries.append(msgspec.json.decode(line, type=self.type_)) except msgspec.DecodeError: - break + self.delete() + return [] return entries def delete(self) -> None: diff --git a/src/_pytask/runtime_store.py b/src/_pytask/runtime_store.py index 646ac1a1..6a3e103c 100644 --- a/src/_pytask/runtime_store.py +++ b/src/_pytask/runtime_store.py @@ -7,7 +7,6 @@ from typing import TYPE_CHECKING import msgspec -from packaging.version import Version from _pytask.journal import JsonlJournal @@ -19,14 +18,6 @@ CURRENT_RUNTIME_VERSION = "1" -class RuntimeStoreError(Exception): - """Raised when reading or writing runtime files fails.""" - - -class RuntimeStoreVersionError(RuntimeStoreError): - """Raised when a runtime file version is not supported.""" - - class _RuntimeEntry(msgspec.Struct): id: str date: float @@ -63,15 +54,12 @@ def _read_runtimes(path: Path) -> _RuntimeFile | None: try: data = msgspec.json.decode(path.read_bytes(), type=_RuntimeFile) except msgspec.DecodeError: - msg = "Runtime file has invalid format." - raise RuntimeStoreError(msg) from None + path.unlink() + return None - if Version(data.runtime_version) != Version(CURRENT_RUNTIME_VERSION): - msg = ( - f"Unsupported runtime-version {data.runtime_version!r}. " - f"Current version is {CURRENT_RUNTIME_VERSION}." - ) - raise RuntimeStoreVersionError(msg) + if data.runtime_version != CURRENT_RUNTIME_VERSION: + path.unlink() + return None return data @@ -87,12 +75,9 @@ def _read_journal( ) -> list[_RuntimeJournalEntry]: entries = journal.read() for entry in entries: - if Version(entry.runtime_version) != Version(CURRENT_RUNTIME_VERSION): - msg = ( - f"Unsupported runtime-version {entry.runtime_version!r}. " - f"Current version is {CURRENT_RUNTIME_VERSION}." - ) - raise RuntimeStoreVersionError(msg) + if entry.runtime_version != CURRENT_RUNTIME_VERSION: + journal.delete() + return [] return entries @@ -147,12 +132,8 @@ def from_root(cls, root: Path) -> RuntimeState: def _rebuild_index(self) -> None: self._index = {entry.id: entry for entry in self.runtimes.task} - @staticmethod - def _task_id(task: PTask) -> str: - return task.name - def update_task(self, task: PTask, start: float, end: float) -> None: - task_id = self._task_id(task) + task_id = task.name entry = _RuntimeEntry(id=task_id, date=start, duration=end - start) self._index[entry.id] = entry self.runtimes = _RuntimeFile( @@ -170,7 +151,7 @@ def update_task(self, task: PTask, start: float, end: float) -> None: self._dirty = True def get_duration(self, task: PTask) -> float | None: - task_id = self._task_id(task) + task_id = task.name entry = self._index.get(task_id) if entry is None: return None From 95331b5244713b9ccabfabf04a2ed64f6c619adc Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 14:05:48 +0100 Subject: [PATCH 08/10] Improve runtime profiling storage resilience --- src/_pytask/journal.py | 18 ++++++++++++------ src/_pytask/profile.py | 1 + src/_pytask/runtime_store.py | 21 ++------------------- tests/test_runtime_store.py | 19 +++++++++++++++++++ 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/_pytask/journal.py b/src/_pytask/journal.py index 13c5776e..36ebbb14 100644 --- a/src/_pytask/journal.py +++ b/src/_pytask/journal.py @@ -28,19 +28,25 @@ def append(self, payload: msgspec.Struct) -> None: journal_file.write(msgspec.json.encode(payload) + b"\n") def read(self) -> list[T]: - """Read entries, clearing the journal on decode errors.""" + """Read entries, keeping valid entries on decode errors.""" if not self.path.exists(): return [] entries: list[T] = [] - for line in self.path.read_bytes().splitlines(): - if not line.strip(): + data = self.path.read_bytes() + offset = 0 + for line in data.splitlines(keepends=True): + stripped = line.strip() + if not stripped: + offset += len(line) continue try: - entries.append(msgspec.json.decode(line, type=self.type_)) + entries.append(msgspec.json.decode(stripped, type=self.type_)) except msgspec.DecodeError: - self.delete() - return [] + with self.path.open("rb+") as journal_file: + journal_file.truncate(offset) + return entries + offset += len(line) return entries def delete(self) -> None: diff --git a/src/_pytask/profile.py b/src/_pytask/profile.py index 95360deb..74b50e15 100644 --- a/src/_pytask/profile.py +++ b/src/_pytask/profile.py @@ -99,6 +99,7 @@ def pytask_unconfigure(self, session: Session) -> None: self.runtime_state.flush() +@dataclass class DurationNameSpace: """A namespace for adding durations to the profile.""" diff --git a/src/_pytask/runtime_store.py b/src/_pytask/runtime_store.py index 6a3e103c..94cc9cf0 100644 --- a/src/_pytask/runtime_store.py +++ b/src/_pytask/runtime_store.py @@ -15,8 +15,6 @@ from _pytask.node_protocols import PTask -CURRENT_RUNTIME_VERSION = "1" - class _RuntimeEntry(msgspec.Struct): id: str @@ -25,12 +23,10 @@ class _RuntimeEntry(msgspec.Struct): class _RuntimeFile(msgspec.Struct, forbid_unknown_fields=False): - runtime_version: str = msgspec.field(name="runtime-version") task: list[_RuntimeEntry] = msgspec.field(default_factory=list) -class _RuntimeJournalEntry(msgspec.Struct): - runtime_version: str = msgspec.field(name="runtime-version") +class _RuntimeJournalEntry(msgspec.Struct, forbid_unknown_fields=False): id: str date: float duration: float @@ -56,10 +52,6 @@ def _read_runtimes(path: Path) -> _RuntimeFile | None: except msgspec.DecodeError: path.unlink() return None - - if data.runtime_version != CURRENT_RUNTIME_VERSION: - path.unlink() - return None return data @@ -73,12 +65,7 @@ def _write_runtimes(path: Path, runtimes: _RuntimeFile) -> None: def _read_journal( journal: JsonlJournal[_RuntimeJournalEntry], ) -> list[_RuntimeJournalEntry]: - entries = journal.read() - for entry in entries: - if entry.runtime_version != CURRENT_RUNTIME_VERSION: - journal.delete() - return [] - return entries + return journal.read() def _apply_journal( @@ -92,7 +79,6 @@ def _apply_journal( id=entry.id, date=entry.date, duration=entry.duration ) return _RuntimeFile( - runtime_version=CURRENT_RUNTIME_VERSION, task=list(index.values()), ) @@ -116,7 +102,6 @@ def from_root(cls, root: Path) -> RuntimeState: journal_entries = _read_journal(journal) if existing is None: runtimes = _RuntimeFile( - runtime_version=CURRENT_RUNTIME_VERSION, task=[], ) runtimes = _apply_journal(runtimes, journal_entries) @@ -137,12 +122,10 @@ def update_task(self, task: PTask, start: float, end: float) -> None: entry = _RuntimeEntry(id=task_id, date=start, duration=end - start) self._index[entry.id] = entry self.runtimes = _RuntimeFile( - runtime_version=CURRENT_RUNTIME_VERSION, task=list(self._index.values()), ) self._rebuild_index() journal_entry = _RuntimeJournalEntry( - runtime_version=CURRENT_RUNTIME_VERSION, id=entry.id, date=entry.date, duration=entry.duration, diff --git a/tests/test_runtime_store.py b/tests/test_runtime_store.py index 8fc77521..4cacca99 100644 --- a/tests/test_runtime_store.py +++ b/tests/test_runtime_store.py @@ -58,3 +58,22 @@ def test_runtime_state_flushes_journal(tmp_path): reloaded = RuntimeState.from_root(tmp_path) assert reloaded.get_duration(task) == pytest.approx(3.5) + + +def test_runtime_state_recovers_from_corrupt_journal(tmp_path): + tmp_path.joinpath(".pytask").mkdir() + task_a = DummyTask(name="task_a") + task_b = DummyTask(name="task_b") + + state = RuntimeState.from_root(tmp_path) + state.update_task(task_a, 1.0, 3.0) + state.update_task(task_b, 2.0, 6.0) + + journal_path = tmp_path / ".pytask" / "runtimes.journal" + with journal_path.open("ab") as journal_file: + journal_file.write(b'{"id": "corrupt"') + + recovered = RuntimeState.from_root(tmp_path) + assert recovered.get_duration(task_a) == pytest.approx(2.0) + assert recovered.get_duration(task_b) == pytest.approx(4.0) + assert b'"corrupt"' not in journal_path.read_bytes() From 7b0d848a57d0323faec0e30052723a4ff1afc997 Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 14:07:44 +0100 Subject: [PATCH 09/10] Reference PR 766 in changelog --- CHANGELOG.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a8e8749..6a860c82 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,9 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and ## Unreleased -- Move runtime profiling persistence from SQLite to a JSON snapshot plus append-only - journal in `.pytask/`, keeping runtime data resilient to crashes and compacted on - normal build exits. +- {pull}`766` moves runtime profiling persistence from SQLite to a JSON snapshot plus + append-only journal in `.pytask/`, keeping runtime data resilient to crashes and + compacted on normal build exits. ## 0.5.8 - 2025-12-30 From d3a90de1de190eb94309dc2b77ab930533bf870a Mon Sep 17 00:00:00 2001 From: Tobias Raabe Date: Sun, 1 Feb 2026 14:13:11 +0100 Subject: [PATCH 10/10] Inline runtime journal helpers --- src/_pytask/runtime_store.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/src/_pytask/runtime_store.py b/src/_pytask/runtime_store.py index 94cc9cf0..ab4429ee 100644 --- a/src/_pytask/runtime_store.py +++ b/src/_pytask/runtime_store.py @@ -32,18 +32,6 @@ class _RuntimeJournalEntry(msgspec.Struct, forbid_unknown_fields=False): duration: float -def _runtimes_path(root: Path) -> Path: - return root / ".pytask" / "runtimes.json" - - -def _journal_path(path: Path) -> Path: - return path.with_suffix(".journal") - - -def _journal(path: Path) -> JsonlJournal[_RuntimeJournalEntry]: - return JsonlJournal(path=_journal_path(path), type_=_RuntimeJournalEntry) - - def _read_runtimes(path: Path) -> _RuntimeFile | None: if not path.exists(): return None @@ -96,8 +84,10 @@ def __post_init__(self) -> None: @classmethod def from_root(cls, root: Path) -> RuntimeState: - path = _runtimes_path(root) - journal = _journal(path) + path = root / ".pytask" / "runtimes.json" + journal = JsonlJournal( + path=path.with_suffix(".journal"), type_=_RuntimeJournalEntry + ) existing = _read_runtimes(path) journal_entries = _read_journal(journal) if existing is None: