From b0f8ebefbcd35ddf507be86478e97ec2f25ac9c7 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 11 Mar 2026 08:31:27 +0100 Subject: [PATCH 1/2] Add erlang.sleep() with callback-based sync suspension Add erlang.sleep() function that works in both async and sync contexts: - Async: returns asyncio.sleep() which uses Erlang timer system - Sync: uses erlang.call('_py_sleep') callback with receive/after, truly releasing the dirty scheduler for cooperative yielding Remove unused _erlang_sleep NIF which only released the GIL but blocked the pthread. The callback approach properly suspends the Erlang process. Changes: - Add sleep() to _erlang_impl and export to erlang module - Add _py_sleep callback in py_event_loop.erl - Remove py_erlang_sleep NIF and dispatch_sleep_complete - Remove sync_sleep fields from event loop struct - Remove sleep handlers from py_event_worker - Update tests to use erlang.sleep() --- c_src/py_callback.c | 1 + c_src/py_event_loop.c | 160 +-------------------------------- c_src/py_event_loop.h | 14 --- c_src/py_nif.c | 1 - priv/_erlang_impl/__init__.py | 46 ++++++++++ priv/tests/test_erlang_api.py | 147 ++++++++++++++++++++++++++++++ src/py_event_loop.erl | 19 ++++ src/py_event_worker.erl | 23 +---- src/py_nif.erl | 7 -- test/py_erlang_sleep_SUITE.erl | 42 ++++----- 10 files changed, 237 insertions(+), 223 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index ec8a152..529f413 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -2409,6 +2409,7 @@ static int create_erlang_module(void) { " import erlang\n" " # Primary exports (uvloop-compatible)\n" " erlang.run = _erlang_impl.run\n" + " erlang.sleep = _erlang_impl.sleep\n" " erlang.spawn_task = _erlang_impl.spawn_task\n" " erlang.new_event_loop = _erlang_impl.new_event_loop\n" " erlang.ErlangEventLoop = _erlang_impl.ErlangEventLoop\n" diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 66b8377..72de04d 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -365,12 +365,9 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) { /* Signal shutdown */ loop->shutdown = true; - /* Wake up any waiting threads (including sync sleep waiters) */ + /* Wake up any waiting threads */ pthread_mutex_lock(&loop->mutex); pthread_cond_broadcast(&loop->event_cond); - if (loop->sync_sleep_cond_initialized) { - pthread_cond_broadcast(&loop->sync_sleep_cond); - } pthread_mutex_unlock(&loop->mutex); /* Clear pending events (returns them to freelist) */ @@ -395,9 +392,6 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) { /* Destroy synchronization primitives */ pthread_mutex_destroy(&loop->mutex); pthread_cond_destroy(&loop->event_cond); - if (loop->sync_sleep_cond_initialized) { - pthread_cond_destroy(&loop->sync_sleep_cond); - } } /** @@ -619,19 +613,8 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc, return make_error(env, "cond_init_failed"); } - if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) { - pthread_cond_destroy(&loop->event_cond); - pthread_mutex_destroy(&loop->mutex); - enif_release_resource(loop); - return make_error(env, "sleep_cond_init_failed"); - } - loop->sync_sleep_cond_initialized = true; - atomic_store(&loop->sync_sleep_id, 0); - atomic_store(&loop->sync_sleep_complete, false); - loop->msg_env = enif_alloc_env(); if (loop->msg_env == NULL) { - pthread_cond_destroy(&loop->sync_sleep_cond); pthread_cond_destroy(&loop->event_cond); pthread_mutex_destroy(&loop->mutex); enif_release_resource(loop); @@ -1325,38 +1308,6 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc, return ATOM_OK; } -/** - * dispatch_sleep_complete(LoopRef, SleepId) -> ok - * - * Called from Erlang when a synchronous sleep timer expires. - * Signals the waiting Python thread to wake up. - */ -ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc, - const ERL_NIF_TERM argv[]) { - (void)argc; - - erlang_event_loop_t *loop; - if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE, - (void **)&loop)) { - return make_error(env, "invalid_loop"); - } - - ErlNifUInt64 sleep_id; - if (!enif_get_uint64(env, argv[1], &sleep_id)) { - return make_error(env, "invalid_sleep_id"); - } - - /* Only signal if this is the sleep we're waiting for */ - pthread_mutex_lock(&loop->mutex); - if (atomic_load(&loop->sync_sleep_id) == sleep_id) { - atomic_store(&loop->sync_sleep_complete, true); - pthread_cond_broadcast(&loop->sync_sleep_cond); - } - pthread_mutex_unlock(&loop->mutex); - - return ATOM_OK; -} - /** * handle_fd_event(FdRes, Type) -> ok | {error, Reason} * @@ -5151,102 +5102,6 @@ static PyObject *py_get_pending_for(PyObject *self, PyObject *args) { return list; } -/** - * Python function: _erlang_sleep(delay_ms) -> None - * - * Synchronous sleep that uses Erlang's timer system instead of asyncio. - * Sends {sleep_wait, DelayMs, SleepId} to the worker, then blocks waiting - * for the sleep completion signal. - * - * This is called from the ASGI fast path when asyncio.sleep() is detected, - * avoiding the need to create a full event loop. - */ -static PyObject *py_erlang_sleep(PyObject *self, PyObject *args) { - (void)self; - int delay_ms; - - if (!PyArg_ParseTuple(args, "i", &delay_ms)) { - return NULL; - } - - /* For zero or negative delay, return immediately */ - if (delay_ms <= 0) { - Py_RETURN_NONE; - } - - erlang_event_loop_t *loop = get_interpreter_event_loop(); - if (loop == NULL || loop->shutdown) { - PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); - return NULL; - } - - /* Check if we have a worker to send to */ - if (!event_loop_ensure_router(loop)) { - PyErr_SetString(PyExc_RuntimeError, "No worker or router configured"); - return NULL; - } - - /* Generate a unique sleep ID */ - uint64_t sleep_id = atomic_fetch_add(&loop->next_callback_id, 1); - - /* FIX: Store sleep_id BEFORE sending to prevent race condition. - * If completion arrives before storage, it would be dropped and waiter deadlocks. */ - pthread_mutex_lock(&loop->mutex); - atomic_store(&loop->sync_sleep_id, sleep_id); - atomic_store(&loop->sync_sleep_complete, false); - pthread_mutex_unlock(&loop->mutex); - - /* Send {sleep_wait, DelayMs, SleepId} to worker */ - ErlNifEnv *msg_env = enif_alloc_env(); - if (msg_env == NULL) { - /* On failure, reset sleep_id */ - pthread_mutex_lock(&loop->mutex); - atomic_store(&loop->sync_sleep_id, 0); - pthread_mutex_unlock(&loop->mutex); - PyErr_SetString(PyExc_MemoryError, "Failed to allocate message environment"); - return NULL; - } - - ERL_NIF_TERM msg = enif_make_tuple3( - msg_env, - enif_make_atom(msg_env, "sleep_wait"), - enif_make_int(msg_env, delay_ms), - enif_make_uint64(msg_env, sleep_id) - ); - - /* Use worker_pid when available, otherwise fall back to router_pid */ - ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid; - if (!enif_send(NULL, target_pid, msg_env, msg)) { - /* On failure, reset sleep_id */ - pthread_mutex_lock(&loop->mutex); - atomic_store(&loop->sync_sleep_id, 0); - pthread_mutex_unlock(&loop->mutex); - enif_free_env(msg_env); - PyErr_SetString(PyExc_RuntimeError, "Failed to send sleep message"); - return NULL; - } - enif_free_env(msg_env); - - /* Wait for completion - sleep_id already set above */ - pthread_mutex_lock(&loop->mutex); - - /* Release GIL and wait for completion */ - Py_BEGIN_ALLOW_THREADS - while (!atomic_load(&loop->sync_sleep_complete) && !loop->shutdown) { - pthread_cond_wait(&loop->sync_sleep_cond, &loop->mutex); - } - Py_END_ALLOW_THREADS - - pthread_mutex_unlock(&loop->mutex); - - if (loop->shutdown) { - PyErr_SetString(PyExc_RuntimeError, "Event loop shutdown during sleep"); - return NULL; - } - - Py_RETURN_NONE; -} - /* Module method definitions */ static PyMethodDef PyEventLoopMethods[] = { /* Legacy API (uses global event loop) */ @@ -5282,8 +5137,6 @@ static PyMethodDef PyEventLoopMethods[] = { {"_release_fd_resource", py_release_fd_resource, METH_VARARGS, "Release fd resource"}, {"_schedule_timer_for", py_schedule_timer_for, METH_VARARGS, "Schedule timer on specific loop"}, {"_cancel_timer_for", py_cancel_timer_for, METH_VARARGS, "Cancel timer on specific loop"}, - /* Synchronous sleep (for ASGI fast path) */ - {"_erlang_sleep", py_erlang_sleep, METH_VARARGS, "Synchronous sleep using Erlang timer"}, {NULL, NULL, 0, NULL} }; @@ -5382,19 +5235,8 @@ int create_default_event_loop(ErlNifEnv *env) { return -1; } - if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) { - pthread_cond_destroy(&loop->event_cond); - pthread_mutex_destroy(&loop->mutex); - enif_release_resource(loop); - return -1; - } - loop->sync_sleep_cond_initialized = true; - atomic_store(&loop->sync_sleep_id, 0); - atomic_store(&loop->sync_sleep_complete, false); - loop->msg_env = enif_alloc_env(); if (loop->msg_env == NULL) { - pthread_cond_destroy(&loop->sync_sleep_cond); pthread_cond_destroy(&loop->event_cond); pthread_mutex_destroy(&loop->mutex); enif_release_resource(loop); diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 3763bc8..4e26eba 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -246,20 +246,6 @@ typedef struct erlang_event_loop { /** @brief Flag indicating a wakeup is pending (uvloop-style coalescing) */ _Atomic bool wake_pending; - /* ========== Synchronous Sleep Support ========== */ - - /** @brief Current synchronous sleep ID being waited on */ - _Atomic uint64_t sync_sleep_id; - - /** @brief Flag indicating sleep has completed */ - _Atomic bool sync_sleep_complete; - - /** @brief Condition variable for sleep completion notification */ - pthread_cond_t sync_sleep_cond; - - /** @brief Whether sync_sleep_cond has been initialized */ - bool sync_sleep_cond_initialized; - /** @brief Interpreter ID: 0 = main interpreter, >0 = subinterpreter */ uint32_t interp_id; } erlang_event_loop_t; diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 7fba13f..fc2adc1 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -3856,7 +3856,6 @@ static ErlNifFunc nif_funcs[] = { {"get_pending", 1, nif_get_pending, 0}, {"dispatch_callback", 3, nif_dispatch_callback, 0}, {"dispatch_timer", 2, nif_dispatch_timer, 0}, - {"dispatch_sleep_complete", 2, nif_dispatch_sleep_complete, 0}, {"get_fd_callback_id", 2, nif_get_fd_callback_id, 0}, {"reselect_reader", 2, nif_reselect_reader, 0}, {"reselect_writer", 2, nif_reselect_writer, 0}, diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index 0781288..251e085 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -46,6 +46,7 @@ import sys import asyncio +import time import warnings # Install sandbox when running inside Erlang VM @@ -66,6 +67,7 @@ __all__ = [ 'run', + 'sleep', 'spawn_task', 'new_event_loop', 'get_event_loop_policy', @@ -163,6 +165,50 @@ async def main(): loop.close() +def sleep(seconds): + """Sleep for the given duration, yielding to other tasks. + + Works in both async and sync contexts: + - Async context: Returns an awaitable (use with await) + - Sync context: Suspends via Erlang, releasing the dirty scheduler + + In async context, uses asyncio.sleep() which routes through the Erlang + timer system via erlang:send_after. + + In sync context, calls into Erlang which blocks using receive/after, + fully releasing the dirty NIF scheduler so other Erlang processes can + run. This is true cooperative yielding like gevent.sleep(). + + Args: + seconds: Duration to sleep in seconds (float or int). + + Returns: + In async context: A coroutine that should be awaited. + In sync context: None (suspends until sleep completes). + + Example: + # Async context + async def main(): + await erlang.sleep(0.5) # Uses Erlang timer system + + # Sync context (cooperative yield) + def handler(): + erlang.sleep(0.5) # Suspends, frees dirty scheduler + """ + try: + asyncio.get_running_loop() + # Async context - return awaitable that uses Erlang timers + return asyncio.sleep(seconds) + except RuntimeError: + # Sync context - use erlang.call to truly suspend and free dirty scheduler + try: + import erlang + erlang.call('_py_sleep', seconds) + except (ImportError, AttributeError): + # Fallback when not in Erlang NIF environment + time.sleep(seconds) + + def spawn_task(coro, *, name=None): """Spawn an async task, working in both async and sync contexts. diff --git a/priv/tests/test_erlang_api.py b/priv/tests/test_erlang_api.py index 7c5f440..b754f91 100644 --- a/priv/tests/test_erlang_api.py +++ b/priv/tests/test_erlang_api.py @@ -29,6 +29,7 @@ import asyncio import sys +import time import unittest import warnings @@ -573,5 +574,151 @@ async def main(): self.assertEqual(result, [2, 4, 6]) +class TestErlangSleep(tb.ErlangTestCase): + """Tests for erlang.sleep() function.""" + + def test_sleep_async_basic(self): + """Test await erlang.sleep() in async context.""" + erlang = _get_erlang_module() + + async def main(): + start = time.time() + await erlang.sleep(0.05) + elapsed = time.time() - start + return elapsed + + elapsed = self.loop.run_until_complete(main()) + # Should sleep at least 50ms (allowing some tolerance) + self.assertGreaterEqual(elapsed, 0.04) + # Should not sleep too long (sanity check) + self.assertLess(elapsed, 0.5) + + def test_sleep_async_zero(self): + """Test await erlang.sleep(0) yields but returns immediately.""" + erlang = _get_erlang_module() + + async def main(): + start = time.time() + await erlang.sleep(0) + elapsed = time.time() - start + return elapsed + + elapsed = self.loop.run_until_complete(main()) + # Should return very quickly + self.assertLess(elapsed, 0.1) + + def test_sleep_async_concurrent(self): + """Test erlang.sleep() works correctly with concurrent tasks.""" + erlang = _get_erlang_module() + + async def task(n, sleep_time): + await erlang.sleep(sleep_time) + return n + + async def main(): + start = time.time() + # Run 3 tasks concurrently, each sleeping 0.05s + results = await asyncio.gather( + task(1, 0.05), + task(2, 0.05), + task(3, 0.05), + ) + elapsed = time.time() - start + return results, elapsed + + results, elapsed = self.loop.run_until_complete(main()) + self.assertEqual(sorted(results), [1, 2, 3]) + # Concurrent: should complete in ~0.05s, not 0.15s + self.assertLess(elapsed, 0.15) + + def test_sleep_async_staggered(self): + """Test erlang.sleep() with staggered sleep times.""" + erlang = _get_erlang_module() + + async def task(n, sleep_time): + await erlang.sleep(sleep_time) + return n + + async def main(): + # Tasks should complete in order of sleep time + results = [] + tasks = [ + asyncio.create_task(task(3, 0.06)), + asyncio.create_task(task(1, 0.02)), + asyncio.create_task(task(2, 0.04)), + ] + for coro in asyncio.as_completed(tasks): + results.append(await coro) + return results + + results = self.loop.run_until_complete(main()) + # Should complete in order: 1 (0.02s), 2 (0.04s), 3 (0.06s) + self.assertEqual(results, [1, 2, 3]) + + def test_sleep_via_erlang_run(self): + """Test erlang.sleep() works with erlang.run().""" + erlang = _get_erlang_module() + + async def main(): + start = time.time() + await erlang.sleep(0.03) + return time.time() - start + + elapsed = erlang.run(main()) + self.assertGreaterEqual(elapsed, 0.02) + self.assertLess(elapsed, 0.2) + + def test_sleep_in_all_exported(self): + """Test that sleep is exported in __all__.""" + erlang = _get_erlang_module() + # Check via _erlang_impl since that's where __all__ is defined + try: + import _erlang_impl + self.assertIn('sleep', _erlang_impl.__all__) + except ImportError: + # If we can't import _erlang_impl directly, just check erlang has it + self.assertTrue(hasattr(erlang, 'sleep')) + + +class TestErlangSleepSync(unittest.TestCase): + """Tests for erlang.sleep() in sync context. + + Note: Sync sleep via Erlang callback only works when running + inside the Erlang NIF environment. These tests verify the API + exists and behaves correctly. + """ + + def test_sleep_function_exists(self): + """Test that erlang.sleep() function exists.""" + erlang = _get_erlang_module() + self.assertTrue(hasattr(erlang, 'sleep')) + self.assertTrue(callable(erlang.sleep)) + + @unittest.skipUnless(tb.INSIDE_ERLANG_NIF, "Requires Erlang NIF environment") + def test_sleep_sync_basic(self): + """Test erlang.sleep() in sync context (inside Erlang NIF).""" + erlang = _get_erlang_module() + + start = time.time() + erlang.sleep(0.05) + elapsed = time.time() - start + + # Should sleep at least 50ms + self.assertGreaterEqual(elapsed, 0.04) + self.assertLess(elapsed, 0.5) + + @unittest.skipUnless(tb.INSIDE_ERLANG_NIF, "Requires Erlang NIF environment") + def test_sleep_sync_zero(self): + """Test erlang.sleep(0) in sync context.""" + erlang = _get_erlang_module() + + start = time.time() + erlang.sleep(0) + elapsed = time.time() - start + + # Should return very quickly + self.assertLess(elapsed, 0.1) + + if __name__ == '__main__': unittest.main() diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 1660886..b7c8138 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -82,6 +82,8 @@ register_callbacks() -> py_callback:register(py_event_loop_get_pending, fun cb_get_pending/1), py_callback:register(py_event_loop_dispatch_callback, fun cb_dispatch_callback/1), py_callback:register(py_event_loop_dispatch_timer, fun cb_dispatch_timer/1), + %% Sleep callback - suspends Erlang process, fully releasing dirty scheduler + py_callback:register(<<"_py_sleep">>, fun cb_sleep/1), ok. %% @doc Run an async coroutine on the event loop. @@ -290,3 +292,20 @@ cb_dispatch_callback([LoopRef, CallbackId, Type]) -> cb_dispatch_timer([LoopRef, CallbackId]) -> py_nif:dispatch_timer(LoopRef, CallbackId). + +%% @doc Sleep callback for Python erlang.sleep(). +%% Suspends the current Erlang process for the specified duration, +%% fully releasing the dirty NIF scheduler to handle other work. +%% This is true cooperative yielding - the dirty scheduler thread is freed. +%% Args: [Seconds] - float or integer seconds (converted to ms internally) +cb_sleep([Seconds]) when is_float(Seconds), Seconds > 0 -> + Ms = round(Seconds * 1000), + receive after Ms -> ok end; +cb_sleep([Seconds]) when is_integer(Seconds), Seconds > 0 -> + Ms = Seconds * 1000, + receive after Ms -> ok end; +cb_sleep([Seconds]) when is_number(Seconds) -> + %% Zero or negative - return immediately + ok; +cb_sleep(_Args) -> + ok. diff --git a/src/py_event_worker.erl b/src/py_event_worker.erl index 1cddb8f..f8cdcae 100644 --- a/src/py_event_worker.erl +++ b/src/py_event_worker.erl @@ -15,8 +15,7 @@ worker_id :: binary(), loop_ref :: reference(), timers = #{} :: #{reference() => {reference(), non_neg_integer()}}, - sleeps = #{} :: #{non_neg_integer() => reference()}, %% SleepId => ErlTimerRef - stats = #{select_count => 0, timer_count => 0, dispatch_count => 0, sleep_count => 0} :: map() + stats = #{select_count => 0, timer_count => 0, dispatch_count => 0} :: map() }). start_link(WorkerId, LoopRef) -> start_link(WorkerId, LoopRef, []). @@ -74,21 +73,6 @@ handle_info({cancel_timer, TimerRef}, State) -> {noreply, State#state{timers = NewTimers}} end; -%% Synchronous sleep support for ASGI fast path -handle_info({sleep_wait, DelayMs, SleepId}, State) -> - #state{sleeps = Sleeps} = State, - %% Schedule a timer that will trigger sleep_complete - ErlTimerRef = erlang:send_after(DelayMs, self(), {sleep_complete, SleepId}), - NewSleeps = maps:put(SleepId, ErlTimerRef, Sleeps), - {noreply, State#state{sleeps = NewSleeps}}; - -handle_info({sleep_complete, SleepId}, State) -> - #state{loop_ref = LoopRef, sleeps = Sleeps} = State, - %% Remove from sleeps map and signal Python that sleep is done - NewSleeps = maps:remove(SleepId, Sleeps), - py_nif:dispatch_sleep_complete(LoopRef, SleepId), - {noreply, State#state{sleeps = NewSleeps}}; - handle_info({timeout, TimerRef}, State) -> #state{loop_ref = LoopRef, timers = Timers} = State, case maps:get(TimerRef, Timers, undefined) of @@ -102,13 +86,10 @@ handle_info({timeout, TimerRef}, State) -> handle_info({select, _FdRes, _Ref, cancelled}, State) -> {noreply, State}; handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{timers = Timers, sleeps = Sleeps}) -> +terminate(_Reason, #state{timers = Timers}) -> maps:foreach(fun(_TimerRef, {ErlTimerRef, _CallbackId}) -> erlang:cancel_timer(ErlTimerRef) end, Timers), - maps:foreach(fun(_SleepId, ErlTimerRef) -> - erlang:cancel_timer(ErlTimerRef) - end, Sleeps), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/py_nif.erl b/src/py_nif.erl index b300044..9674430 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -109,7 +109,6 @@ get_pending/1, dispatch_callback/3, dispatch_timer/2, - dispatch_sleep_complete/2, get_fd_callback_id/2, reselect_reader/2, reselect_writer/2, @@ -791,12 +790,6 @@ dispatch_callback(_LoopRef, _CallbackId, _Type) -> dispatch_timer(_LoopRef, _CallbackId) -> ?NIF_STUB. -%% @doc Signal that a synchronous sleep has completed. -%% Called from Erlang when a sleep timer expires. --spec dispatch_sleep_complete(reference(), non_neg_integer()) -> ok. -dispatch_sleep_complete(_LoopRef, _SleepId) -> - ?NIF_STUB. - %% @doc Get callback ID from an fd resource. %% Type is read or write. -spec get_fd_callback_id(reference(), read | write) -> non_neg_integer() | undefined. diff --git a/test/py_erlang_sleep_SUITE.erl b/test/py_erlang_sleep_SUITE.erl index 78145dd..d876e98 100644 --- a/test/py_erlang_sleep_SUITE.erl +++ b/test/py_erlang_sleep_SUITE.erl @@ -1,6 +1,6 @@ -%% @doc Tests for Erlang sleep and asyncio integration. +%% @doc Tests for erlang.sleep() and asyncio integration. %% -%% Tests the _erlang_sleep NIF and erlang module asyncio integration. +%% Tests the erlang.sleep() function and erlang module asyncio integration. -module(py_erlang_sleep_SUITE). -include_lib("common_test/include/ct.hrl"). @@ -38,22 +38,22 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -%% Test that _erlang_sleep is available in py_event_loop +%% Test that erlang.sleep is available test_erlang_sleep_available(_Config) -> ok = py:exec(<<" -import py_event_loop as pel -result = hasattr(pel, '_erlang_sleep') -assert result, '_erlang_sleep not found in py_event_loop' +import erlang +result = hasattr(erlang, 'sleep') +assert result, 'erlang.sleep not found' ">>), - ct:pal("_erlang_sleep is available"), + ct:pal("erlang.sleep is available"), ok. -%% Test basic sleep functionality +%% Test basic sleep functionality (sync context via callback) test_erlang_sleep_basic(_Config) -> ok = py:exec(<<" -import py_event_loop as pel -# Test basic sleep - should not raise -pel._erlang_sleep(10) # 10ms +import erlang +# Test basic sleep in sync context - should not raise +erlang.sleep(0.01) # 10ms ">>), ct:pal("Basic sleep completed"), ok. @@ -61,14 +61,14 @@ pel._erlang_sleep(10) # 10ms %% Test zero/negative delay returns immediately test_erlang_sleep_zero(_Config) -> ok = py:exec(<<" -import py_event_loop as pel +import erlang import time start = time.time() -pel._erlang_sleep(0) +erlang.sleep(0) elapsed = (time.time() - start) * 1000 -# Should return immediately (< 5ms accounting for Python overhead) -assert elapsed < 5, f'Zero sleep was slow: {elapsed}ms' +# Should return immediately (< 10ms accounting for Python overhead) +assert elapsed < 10, f'Zero sleep was slow: {elapsed}ms' ">>), ct:pal("Zero sleep returned fast"), ok. @@ -76,17 +76,17 @@ assert elapsed < 5, f'Zero sleep was slow: {elapsed}ms' %% Test sleep accuracy test_erlang_sleep_accuracy(_Config) -> ok = py:exec(<<" -import py_event_loop as pel +import erlang import time -delays = [10, 50, 100] # ms +delays = [0.01, 0.05, 0.1] # seconds for delay in delays: start = time.time() - pel._erlang_sleep(delay) - elapsed = (time.time() - start) * 1000 + erlang.sleep(delay) + elapsed = time.time() - start # Allow wide tolerance for CI runners (can be slow/unpredictable) assert delay * 0.5 <= elapsed <= delay * 10.0, \\ - f'{delay}ms sleep took {elapsed:.1f}ms' + f'{delay}s sleep took {elapsed:.3f}s' ">>), ct:pal("Sleep accuracy within tolerance"), ok. @@ -98,7 +98,7 @@ import erlang import asyncio # Test erlang module has expected functions for event loop integration -funcs = ['run', 'new_event_loop', 'EventLoopPolicy'] +funcs = ['run', 'new_event_loop', 'EventLoopPolicy', 'sleep'] for f in funcs: assert hasattr(erlang, f), f'erlang missing {f}' From ed04d3229dba831831f12561a4868c082c198940 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 11 Mar 2026 08:35:40 +0100 Subject: [PATCH 2/2] Document that erlang.sleep() releases dirty scheduler Update docstring and asyncio.md to clarify: - Both sync and async modes release the dirty NIF scheduler - Async: yields to event loop via asyncio.sleep()/call_later() - Sync: suspends Erlang process via receive/after callback Also fix outdated architecture diagram that referenced removed sleep_wait/dispatch_sleep_complete NIF. --- docs/asyncio.md | 68 ++++++++++++++++++++++++----------- priv/_erlang_impl/__init__.py | 27 ++++++++------ 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/docs/asyncio.md b/docs/asyncio.md index 4ac18fe..c5e9fae 100644 --- a/docs/asyncio.md +++ b/docs/asyncio.md @@ -54,9 +54,9 @@ erlang.run(main()) │ ┌──────────────────┐ └────────────────────────────────────┘ │ │ │ asyncio (via │ │ │ │ erlang.run()) │ ┌────────────────────────────────────┐ │ -│ │ sleep() ──┼─{sleep_wait}──▶│ erlang:send_after() + cond_wait │ │ -│ │ gather() │ │ │ │ -│ │ wait_for() │◀──{complete}───│ pthread_cond_broadcast() │ │ +│ │ sleep() │ │ asyncio.sleep() uses call_later() │ │ +│ │ gather() │─call_later()──▶│ which triggers erlang:send_after │ │ +│ │ wait_for() │ │ │ │ │ │ create_task() │ └────────────────────────────────────┘ │ │ └──────────────────┘ │ │ │ @@ -632,7 +632,7 @@ Unlike Python's standard polling-based event loop, the Erlang event loop uses `e ``` ┌─────────────────────────────────────────────────────────────────────────┐ -│ asyncio.sleep() via ErlangEventLoop │ +│ asyncio.sleep() via ErlangEventLoop │ │ │ │ Python Erlang │ │ ────── ────── │ @@ -640,33 +640,34 @@ Unlike Python's standard polling-based event loop, the Erlang event loop uses `e │ ┌─────────────────┐ ┌─────────────────────────────────┐ │ │ │ asyncio.sleep │ │ py_event_worker │ │ │ │ (0.1) │ │ │ │ -│ └────────┬────────┘ │ handle_info({sleep_wait,...}) │ │ -│ │ │ │ │ │ -│ ▼ │ ▼ │ │ -│ ┌─────────────────┐ │ erlang:send_after(100ms) │ │ -│ │ ErlangEventLoop │──{sleep_wait,│ │ │ │ -│ │ call_later() │ 100, Id}──▶│ ▼ │ │ -│ └────────┬────────┘ │ handle_info({sleep_complete}) │ │ -│ │ │ │ │ │ -│ ┌────────▼────────┐ │ ▼ │ │ -│ │ Release GIL │ │ py_nif:dispatch_sleep_complete │ │ -│ │ pthread_cond_ │◀─────────────│ │ │ │ -│ │ wait() │ signal └─────────┼───────────────────────┘ │ +│ └────────┬────────┘ │ │ │ +│ │ │ │ │ +│ ▼ │ │ │ +│ ┌─────────────────┐ │ │ │ +│ │ ErlangEventLoop │──{timer,100, │ erlang:send_after(100ms) │ │ +│ │ call_later() │ Id}─────▶│ │ │ │ +│ └────────┬────────┘ │ ▼ │ │ +│ │ │ handle_info({timeout, ...}) │ │ +│ ┌────────▼────────┐ │ │ │ │ +│ │ Yield to event │ │ ▼ │ │ +│ │ loop (dirty │ │ py_nif:dispatch_timer() │ │ +│ │ scheduler │◀─────────────│ │ │ │ +│ │ released) │ callback └─────────┼───────────────────────┘ │ │ └────────┬────────┘ │ │ │ │ │ │ │ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────────────────────┐ │ -│ │ Reacquire GIL │ │ pthread_cond_broadcast() │ │ -│ │ Return result │ │ (wakes Python thread) │ │ +│ │ Resume after │ │ Timer callback dispatched to │ │ +│ │ timer fires │ │ Python pending queue │ │ │ └─────────────────┘ └─────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ ``` **Key features:** -- **GIL released during sleep** - Python thread doesn't hold the GIL while waiting +- **Dirty scheduler released during sleep** - Python yields to event loop, freeing the dirty NIF thread - **BEAM scheduler integration** - Uses Erlang's native timer system -- **Zero CPU usage** - Condition variable wait, no polling +- **Zero CPU usage** - No polling, event-driven callback - **Sub-millisecond precision** - Timers managed by BEAM scheduler ### Basic Usage @@ -688,6 +689,33 @@ result = erlang.run(my_handler()) When using `erlang.run()` or the Erlang event loop, all standard asyncio functions work seamlessly with Erlang's backend. +#### erlang.sleep(seconds) + +Sleep for the specified duration. Works in both async and sync contexts, and **always releases the dirty NIF scheduler**. + +```python +import erlang + +# Async context - releases dirty scheduler via event loop yield +async def async_handler(): + await erlang.sleep(0.1) # Uses asyncio.sleep() internally + return "done" + +# Sync context - releases dirty scheduler via Erlang process suspension +def sync_handler(): + erlang.sleep(0.1) # Uses receive/after, true cooperative yield + return "done" +``` + +**Dirty Scheduler Release:** + +| Context | Mechanism | Dirty Scheduler | +|---------|-----------|-----------------| +| Async (`await erlang.sleep()`) | `asyncio.sleep()` via `call_later()` | Released (yields to event loop) | +| Sync (`erlang.sleep()`) | `erlang.call('_py_sleep')` with `receive/after` | Released (Erlang process suspends) | + +Both modes allow other Erlang processes and Python contexts to run during the sleep. + #### asyncio.sleep(delay) Sleep for the specified delay. Uses Erlang's `erlang:send_after/3` internally. diff --git a/priv/_erlang_impl/__init__.py b/priv/_erlang_impl/__init__.py index 251e085..1f73875 100644 --- a/priv/_erlang_impl/__init__.py +++ b/priv/_erlang_impl/__init__.py @@ -166,34 +166,41 @@ async def main(): def sleep(seconds): - """Sleep for the given duration, yielding to other tasks. + """Sleep for the given duration, releasing the dirty scheduler. + + Both sync and async modes release the dirty NIF scheduler thread, + allowing other Erlang processes to run during the sleep. Works in both async and sync contexts: - Async context: Returns an awaitable (use with await) - - Sync context: Suspends via Erlang, releasing the dirty scheduler + - Sync context: Blocks synchronously via Erlang callback + + **Dirty Scheduler Release:** In async context, uses asyncio.sleep() which routes through the Erlang - timer system via erlang:send_after. + timer system via erlang:send_after. The dirty scheduler is released + because the Python code yields back to the event loop. - In sync context, calls into Erlang which blocks using receive/after, - fully releasing the dirty NIF scheduler so other Erlang processes can - run. This is true cooperative yielding like gevent.sleep(). + In sync context, calls into Erlang via erlang.call('_py_sleep', seconds) + which uses receive/after to suspend the Erlang process. This fully + releases the dirty NIF scheduler thread so other Erlang processes and + Python contexts can run. This is true cooperative yielding. Args: seconds: Duration to sleep in seconds (float or int). Returns: In async context: A coroutine that should be awaited. - In sync context: None (suspends until sleep completes). + In sync context: None (blocks until sleep completes). Example: - # Async context + # Async context - releases dirty scheduler via event loop yield async def main(): await erlang.sleep(0.5) # Uses Erlang timer system - # Sync context (cooperative yield) + # Sync context - releases dirty scheduler via Erlang suspension def handler(): - erlang.sleep(0.5) # Suspends, frees dirty scheduler + erlang.sleep(0.5) # Suspends Erlang process, frees dirty scheduler """ try: asyncio.get_running_loop()