fix: drain poll_task after cancel to prevent RuntimeError on closed loop in ActivityWorker/NexusWorker#1560
Open
kenanzamp wants to merge 3 commits into
Open
Conversation
When a worker exception fires, _ActivityWorker.run() (and the identical
path in _NexusWorker.run()) calls poll_task.cancel() and then immediately
awaits the exception task and returns. The cancel() call schedules a
CancelledError callback on the event loop but does not wait for it to
be delivered. At this point every Python-visible Task is done, so
asyncio.run()'s _cancel_all_tasks sweep finds nothing to wait for and
calls loop.close() immediately.
The Rust bridge (pyo3-async-runtimes) delivers its result via
call_soon_threadsafe(), which places a callback in the loop's _ready
queue. If loop.close() races with that callback, the callback fires
against an already-closed loop and raises RuntimeError("Event loop is
closed"). This is a latent race that became consistently reproducible
starting in v1.13.0 after the Worker plugin-chain refactor wrapped
_ActivityWorker.run() in additional async layers, increasing the window
between poll_task.cancel() and loop teardown.
Fix: insert `await asyncio.wait([poll_task])` immediately after
poll_task.cancel(). asyncio.wait() yields to the event loop for at
least one iteration, allowing any pending Rust-side call_soon_threadsafe
callbacks to be processed while the loop is still open. asyncio.wait()
does not raise even if the task was already cancelled, so it is safe
as a pure drain primitive. The same fix is applied symmetrically to
_NexusWorker.run() which has an identical structure.
A regression test (tests/worker/test_shutdown_race.py) is included that
simulates the race by patching call_soon_threadsafe to defer callbacks
until after cancel(), confirming RuntimeError before the fix and clean
shutdown after it.
|
|
….so needed) The previous test_fixed_activity_worker_poll_exits_cleanly and test_fixed_nexus_worker_poll_exits_cleanly tests tried to import _ActivityWorker/_NexusWorker from the source tree, which fails in CI environments where the Rust bridge (.so) is not compiled. Replace the dynamic import + inspect.getsource() approach with a direct open() read of the source file, which works in any environment. All 3 tests now pass without the compiled bridge.
…ain fix (c-test-2)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
fix: drain poll_task after cancel to prevent RuntimeError("Event loop is closed") in ActivityWorker and NexusWorker
Summary
_ActivityWorker.run()and_NexusWorker.run()both follow this shutdown pattern when a worker exception fires:poll_task.cancel()schedules aCancelledErrordelivery but returns immediately — it does not wait for the loop to process it. Meanwhile, the Rust bridge (pyo3-async-runtimes) may still have a result on its way viacall_soon_threadsafe, which places a callback in the event loop's_readyqueue.At this point every Python-visible
asyncio.Taskis done.asyncio.run()'s_cancel_all_taskssweep finds nothing to cancel and callsloop.close()immediately. Ifloop.close()races with the pendingcall_soon_threadsafecallback, Python raises:This race existed before v1.13.0 but the window was tiny. The v1.13.0 Worker plugin-chain refactor (introduced in #1337 / 5e93e63) wrapped
_ActivityWorker.run()in additional async layers, consistently widening the window and making the race reliably reproducible.Root cause
The race is entirely on the Python side — it lives in the interaction between
asyncio.Task.cancel(),asyncio.run()teardown, andcall_soon_threadsafe. No changes to sdk-core (Rust) are required or proposed.Fix
Insert
await asyncio.wait([poll_task])immediately afterpoll_task.cancel():asyncio.wait()yields to the event loop for at least one iteration, flushing any queuedcall_soon_threadsafecallbacks beforeloop.close()is called. It does not raise on a cancelled task. The same fix is applied symmetrically to_NexusWorker.run().Testing
tests/worker/test_shutdown_race.py(new file) reproduces the race deterministically by patchingcall_soon_threadsafeto defer callbacks until afterpoll_task.cancel()returns. Without the fix, 10/10 iterations raiseRuntimeError("Event loop is closed"). With the fix, all iterations complete cleanly.Workaround context
This fix resolves a race first observed in production at Zamp (see workaround at https://github.com/Zampfi/pantheon/pull/5329). The workaround wrapped the worker in a retry loop; this fix removes the need for that workaround by addressing the root cause in the SDK itself.
Checklist
tests/worker/test_shutdown_race.py)_activity.pyand_nexus.pyruff check --select I— cleanruff format --check— cleanmypy— pre-existing error inworkflow_sandbox/_importer.py(unrelated, present onmain)Fixes the
RuntimeError: Event loop is closedshutdown race made consistently reproducible in v1.13.0.