Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695
Python: Refactor runner/workflow responsibilities and fix checkpoint ancestry bug#6695TaoChenOSU wants to merge 8 commits into
Conversation
…d fix checkpoint ancestry bug Move runner-state ownership out of Workflow into Runner for clearer responsibilities. Add a weakref-based concurrent-run guard in Workflow and fix the stream-drop race in run_until_convergence. Fix the checkpoint ancestry bug by tracking the previous checkpoint id as runner instance state so parent pointers persist across resumed runs. Move Runner to a deprecated lazy __getattr__ export (backward-compatible with DeprecationWarning) and export CheckpointID.
There was a problem hiding this comment.
Pull request overview
Refactors Python workflow execution internals to better separate Workflow vs Runner responsibilities, improves concurrency/run-lifecycle guarding for Workflow.run(), and fixes checkpoint ancestry so post-resume checkpoints correctly chain to the resumed checkpoint.
Changes:
- Move run/checkpoint bookkeeping to
Runnerand persistprevious_checkpoint_idacross resumed runs. - Replace the boolean concurrent-run guard with a weakref-backed guard and add tests covering stream-drop/finalizer races.
- Deprecate public
Runnerexport via lazy__getattr__while keeping backward compatibility, and exportCheckpointID.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| python/packages/core/tests/workflow/test_workflow.py | Updates concurrency-guard expectations and adds regression tests for lock release and stale-finalizer races. |
| python/packages/core/tests/workflow/test_runner.py | Updates runner lifecycle expectations and adds tests for checkpoint chaining after resume. |
| python/packages/core/tests/workflow/test_checkpoint.py | Adds an end-to-end ancestry test ensuring checkpoint chains remain intact across resume boundaries. |
| python/packages/core/agent_framework/_workflows/_workflow.py | Implements weakref-based active-run guard and moves cleanup into _run_core with stale-finalizer protection. |
| python/packages/core/agent_framework/_workflows/_runner.py | Moves checkpoint ancestry tracking into runner instance state and updates resume handling. |
| python/packages/core/agent_framework/init.py | Exports CheckpointID and lazily re-exports deprecated Runner with a DeprecationWarning. |
There was a problem hiding this comment.
Automated Code Review
Reviewers: 5 | Confidence: 88%
✓ Correctness
The PR correctly fixes the checkpoint ancestry bug by promoting
_previous_checkpoint_idto Runner instance state and properly seding it in_mark_resumed. The weakref-based concurrency guard in Workflow is well-designed: the identity check (if self._active_run is my_active_run) in the finally block correctly prevents the stale-finalizer race, and the GC-time weakref invalidation handles dropped/unconsumed streams. The removal of the Runner's_runningflag is clean since concurrency enforcement now lives at the Workflow level. No correctness issues found.
✓ Security Reliability
This PR is clean from a security and reliability standpoint. The weakref-based concurrency guard correctly handles the stale-finalizer race via identity comparison in the finally block. The checkpoint ancestry fix properly persists previous_checkpoint_id as Runner instance state so it survives across resumed runs. Resource cleanup (clearing runtime checkpoint storage) is reliably executed in _run_core's finally regardless of how the run terminates. The removal of Runner's self-locking is explicitly delegated to Workflow, and Runner is deprecated from the public API. No injection risks, resource leaks, or unhandled failure modes were identified.
✓ Test Coverage
The PR has excellent test coverage for its core changes: checkpoint ancestry preservation, the weakref-based concurrent-run guard, sequential reuse after failure, the GC-finalizer race fix, and unconsumed stream cleanup. The primary test coverage gap is the new
Runnerdeprecation warning behavior —from agent_framework import Runnernow emits aDeprecationWarningvia a module-level__getattr__, but no test verifies this user-facing behavior change. All other significant behavioral changes have corresponding tests with meaningful assertions.
✓ Failure Modes
The PR correctly implements the weakref-based concurrency guard and checkpoint ancestry fix. However, the
_run_corefinally block has a stale-finalizer race:clear_runtime_checkpoint_storage()is not guarded by the same identity check that protectsself._active_run, so a dropped stream's deferred finalizer can silently erase a successor run's checkpoint storage.
✓ Design Approach
I found one correctness issue in the new runner/workflow split: resume state is only cleared on the successful exit path, so a resumed run that fails leaves stale checkpoint ancestry state behind for the next fresh run on the same workflow instance.
Automated review by TaoChenOSU's agents
Close the stream-drop race where a dropped run's deferred async-generator finalizer could leave a runtime checkpoint storage override set (inherited by a new run) or clear a successor run's storage. run() now defensively clears any stale override before starting, and _run_core only clears the override if this run still owns it (mirroring the _active_run ownership guard). Adds regression tests for both the inheritance and clobber cases.
_runtime_storage_owner always held the same weakref as _active_run, so the two ownership conditions were equivalent. Derive ownership from a single owns_run = (_active_run is my_active_run) captured before the active-run clear, and remove the redundant field. No behavior change.
Both the active-run release and the runtime-storage clear are gated on owns_run, so fold the storage clear inside the if owns_run block. No behavior change.
_resumed_from_checkpoint was only cleared on the success path of run_until_convergence, so a failure during a resumed run (e.g. executor failure) left it True. The next fresh run then skipped the superstep-0 checkpoint and parented later checkpoints to the stale resume point. Move the reset into a finally. Add a regression test that fails a resumed run via an executor error and asserts the next fresh run creates the superstep-0 checkpoint.
Python Test Coverage Report •
Python Unit Test Overview
|
|||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Automated Code Review
Reviewers: 5 | Confidence: 91%
✓ Correctness
This PR correctly addresses the checkpoint ancestry bug by promoting
_previous_checkpoint_idfrom a local variable to Runner instance state, implements a well-designed weakref-based concurrency guard for Workflow.run(), and properly resets the resume flag in afinallyblock. The ownership identity check in_run_core's finally block (self._active_run is my_active_run) correctly prevents stale async-generator finalizers from clobbering successor runs. All three previously-resolved review findings have been addressed in the current code. No correctness issues found.
✓ Security Reliability
This PR correctly addresses the checkpoint ancestry bug, implements a well-designed weakref-based concurrency guard, and properly moves the resume-flag reset into a
finallyblock. The three previously resolved review comments (stale resume flag, stale checkpoint storage from dead weakref, and finalizer race clearing successor's storage) are all addressed in the current code. The identity-based ownership check in_run_core'sfinally(self._active_run is my_active_run) correctly prevents a stale async-generator finalizer from clobbering a successor run's state, and the premptiveclear_runtime_checkpoint_storage()inrun()handles the dead-weakref-with-live-storage edge case. No security or reliability issues found.
✓ Test Coverage
The test coverage for the core behavioral changes is comprehensive and well-structured. Checkpoint ancestry, resume-flag-reset-on-failure, weakref-based concurrency guard, and the GC-finalizer race are all thoroughly tested at both the Runner and Workflow levels. Two gaps are worth noting: (1) the new
Runnerdeprecation warning via__getattr__has no test verifying thatfrom agent_framework import Runneremits aDeprecationWarning, and (2) there is no Workflow-level test confirming the run lock is released after a failed run (the happy-path release is tested, and the Runner-level failure test exists, but the Workflow-level failure path is not exercised).
✓ Failure Modes
This PR refactors runner/workflow responsibilities and fixes the checkpoint ancestry bug by promoting
previous_checkpoint_idfrom a per-run local to Runner instance state. The weakref-based concurrency guard, identity-scoped cleanup in_run_core's finally, and the_resumed_from_checkpointreset in a finally block all correctly address the previously identified failure modes. The stale runtime checkpoint storage clearing inrun()before constructing a new stream prevents silent inheritance. Test coverage is thorough, including the GC-finalizer race, failed-resume flag leak, and post-resume checkpoint chaining. No new failure modes found.
✓ Design Approach
I found one blocking design issue in the checkpoint-lineage refactor: the new runner-owned
previous_checkpoint_idsurvives the "failed resumed run -> fresh rerun" path, so the next fresh run's superstep-0 checkpoint is still parented to the stale resumed checkpoint instead of starting a new lineage. The new test added for that scenario checks only that a checkpoint is created, not that its parent is reset, so this regression would currently pass.
Suggestions
- Add a test verifying that accessing
Runnerviaagent_framework.Runner(orfrom agent_framework import Runner) emits aDeprecationWarning. The__getattr__hook at__init__.py:627-638andwarn_runner_deprecated()at_runner.py:28-40are new user-facing code with no test.
Automated review by TaoChenOSU's agents
Motivation & Context
This PR is the first PR for breaking down the changes in #6407 into smaller PRs.
The runner/workflow internals had grown entangled:
Workflowowned runtime checkpoint bookkeeping that conceptually belongs to theRunner, concurrentrun()calls on the same instance were not reliably guarded, and the checkpoint parent pointer (previous_checkpoint_id) was tracked as a per-run local insiderun_until_convergence, so checkpoint ancestry was lost across resumed runs.This is the first of three PRs that split a larger change. It establishes the runner/workflow foundation that the follow-up sub-workflow checkpoint PR and the workflow-reset PR build on.
Description & Review Guide
WorkflowintoRunnerfor clearer responsibilities.Workflowand fix the stream-drop race inrun_until_convergence.previous_checkpoint_idasRunnerinstance state, so checkpoint parent pointers persist across resumed runs.Runnerto a deprecated lazy__getattr__export — it remains importable fromagent_frameworkbut now emits aDeprecationWarning— and exportCheckpointID.from agent_framework import Runnerkeeps working (with a warning); no public API is removed._runner.pyand the concurrent-run guard / stream-drop handling in_workflow.py.Related Issue
Fixes #4588
Contribution Checklist
breaking changelabel (or add "[BREAKING]" to the title prefix, before or after any language prefix) — a workflow keeps the label and title prefix in sync automatically.