Skip to content

Commit 3de02fb

Browse files
authored
fix(worker): event loop should run until yield (#53)
<!-- Describe what has changed in this PR --> **What changed?** * added a new method in DeterministicEventLoop to run workflow coroutines until no callbacks available <!-- Tell your future self why have you made these changes --> **Why?** Existing EventLoop API doesn't have functionality of waiting until callback queue is empty. This is required for cadence workflows to generate all decision outcomes. <!-- How have you verified this change? Tested locally? Added a unit test? Checked in staging env? --> **How did you test it?** Unit Test <!-- Assuming the worst case, what can be broken when deploying this change to production? --> **Potential risks** <!-- Is it notable for release? e.g. schema updates, configuration or data migration required? If so, please mention it, and also update CHANGELOG.md --> **Release notes** <!-- Is there any documentation updates should be made for config, https://cadenceworkflow.io/docs/operation-guide/setup/ ? If so, please open an PR in https://github.com/cadence-workflow/cadence-docs --> **Documentation Changes** --------- Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent 273f7d0 commit 3de02fb

File tree

3 files changed

+25
-4
lines changed

3 files changed

+25
-4
lines changed

cadence/_internal/workflow/deterministic_event_loop.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ def __init__(self):
2727
self._stopping = False
2828
self._closed = False
2929

30+
def run_until_yield(self):
31+
"""Run until stop() is called."""
32+
self._run_forever_setup()
33+
try:
34+
while self._ready:
35+
self._run_once()
36+
finally:
37+
self._run_forever_cleanup()
38+
39+
# Event Loop APIs
3040
def call_soon(
3141
self,
3242
callback: Callable[[Unpack[_Ts]], object],

cadence/_internal/workflow/workflow_engine.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,7 @@ def _execute_workflow_once(
225225
self._workflow_instance.run(workflow_input)
226226
)
227227

228-
# signal the loop to stop after the first run
229-
self._loop.stop()
230-
# this starts the loop and runs once then stops with cleanup
231-
self._loop.run_forever()
228+
self._loop.run_until_yield()
232229

233230
except Exception as e:
234231
logger.error(

tests/cadence/_internal/workflow/test_deterministic_event_loop.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,17 @@ def test_create_task(self):
7676
size = 10000
7777
results = self.loop.run_until_complete(coro_await_task(size))
7878
assert results == list(range(size))
79+
80+
def test_run_once(self):
81+
# run once won't clear the read queue
82+
self.loop.create_task(coro_await_task(10))
83+
self.loop.stop()
84+
self.loop.run_forever()
85+
assert len(self.loop._ready) == 10
86+
87+
def test_run_until_yield(self):
88+
# run until yield will clear the read queue
89+
task = self.loop.create_task(coro_await_task(3))
90+
self.loop.run_until_yield()
91+
assert len(self.loop._ready) == 0
92+
assert task.done() is True

0 commit comments

Comments
 (0)