Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions c_src/py_callback.c
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,7 @@ static int create_erlang_module(void) {
" import erlang\n"
" # Primary exports (uvloop-compatible)\n"
" erlang.run = _erlang_impl.run\n"
" erlang.sleep = _erlang_impl.sleep\n"
" erlang.spawn_task = _erlang_impl.spawn_task\n"
" erlang.new_event_loop = _erlang_impl.new_event_loop\n"
" erlang.ErlangEventLoop = _erlang_impl.ErlangEventLoop\n"
Expand Down
160 changes: 1 addition & 159 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,9 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
/* Signal shutdown */
loop->shutdown = true;

/* Wake up any waiting threads (including sync sleep waiters) */
/* Wake up any waiting threads */
pthread_mutex_lock(&loop->mutex);
pthread_cond_broadcast(&loop->event_cond);
if (loop->sync_sleep_cond_initialized) {
pthread_cond_broadcast(&loop->sync_sleep_cond);
}
pthread_mutex_unlock(&loop->mutex);

/* Clear pending events (returns them to freelist) */
Expand All @@ -395,9 +392,6 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
/* Destroy synchronization primitives */
pthread_mutex_destroy(&loop->mutex);
pthread_cond_destroy(&loop->event_cond);
if (loop->sync_sleep_cond_initialized) {
pthread_cond_destroy(&loop->sync_sleep_cond);
}
}

/**
Expand Down Expand Up @@ -619,19 +613,8 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
return make_error(env, "cond_init_failed");
}

if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
pthread_cond_destroy(&loop->event_cond);
pthread_mutex_destroy(&loop->mutex);
enif_release_resource(loop);
return make_error(env, "sleep_cond_init_failed");
}
loop->sync_sleep_cond_initialized = true;
atomic_store(&loop->sync_sleep_id, 0);
atomic_store(&loop->sync_sleep_complete, false);

loop->msg_env = enif_alloc_env();
if (loop->msg_env == NULL) {
pthread_cond_destroy(&loop->sync_sleep_cond);
pthread_cond_destroy(&loop->event_cond);
pthread_mutex_destroy(&loop->mutex);
enif_release_resource(loop);
Expand Down Expand Up @@ -1325,38 +1308,6 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc,
return ATOM_OK;
}

/**
* dispatch_sleep_complete(LoopRef, SleepId) -> ok
*
* Called from Erlang when a synchronous sleep timer expires.
* Signals the waiting Python thread to wake up.
*/
ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

erlang_event_loop_t *loop;
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
(void **)&loop)) {
return make_error(env, "invalid_loop");
}

ErlNifUInt64 sleep_id;
if (!enif_get_uint64(env, argv[1], &sleep_id)) {
return make_error(env, "invalid_sleep_id");
}

/* Only signal if this is the sleep we're waiting for */
pthread_mutex_lock(&loop->mutex);
if (atomic_load(&loop->sync_sleep_id) == sleep_id) {
atomic_store(&loop->sync_sleep_complete, true);
pthread_cond_broadcast(&loop->sync_sleep_cond);
}
pthread_mutex_unlock(&loop->mutex);

return ATOM_OK;
}

/**
* handle_fd_event(FdRes, Type) -> ok | {error, Reason}
*
Expand Down Expand Up @@ -5151,102 +5102,6 @@ static PyObject *py_get_pending_for(PyObject *self, PyObject *args) {
return list;
}

/**
* Python function: _erlang_sleep(delay_ms) -> None
*
* Synchronous sleep that uses Erlang's timer system instead of asyncio.
* Sends {sleep_wait, DelayMs, SleepId} to the worker, then blocks waiting
* for the sleep completion signal.
*
* This is called from the ASGI fast path when asyncio.sleep() is detected,
* avoiding the need to create a full event loop.
*/
static PyObject *py_erlang_sleep(PyObject *self, PyObject *args) {
(void)self;
int delay_ms;

if (!PyArg_ParseTuple(args, "i", &delay_ms)) {
return NULL;
}

/* For zero or negative delay, return immediately */
if (delay_ms <= 0) {
Py_RETURN_NONE;
}

erlang_event_loop_t *loop = get_interpreter_event_loop();
if (loop == NULL || loop->shutdown) {
PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized");
return NULL;
}

/* Check if we have a worker to send to */
if (!event_loop_ensure_router(loop)) {
PyErr_SetString(PyExc_RuntimeError, "No worker or router configured");
return NULL;
}

/* Generate a unique sleep ID */
uint64_t sleep_id = atomic_fetch_add(&loop->next_callback_id, 1);

/* FIX: Store sleep_id BEFORE sending to prevent race condition.
* If completion arrives before storage, it would be dropped and waiter deadlocks. */
pthread_mutex_lock(&loop->mutex);
atomic_store(&loop->sync_sleep_id, sleep_id);
atomic_store(&loop->sync_sleep_complete, false);
pthread_mutex_unlock(&loop->mutex);

/* Send {sleep_wait, DelayMs, SleepId} to worker */
ErlNifEnv *msg_env = enif_alloc_env();
if (msg_env == NULL) {
/* On failure, reset sleep_id */
pthread_mutex_lock(&loop->mutex);
atomic_store(&loop->sync_sleep_id, 0);
pthread_mutex_unlock(&loop->mutex);
PyErr_SetString(PyExc_MemoryError, "Failed to allocate message environment");
return NULL;
}

ERL_NIF_TERM msg = enif_make_tuple3(
msg_env,
enif_make_atom(msg_env, "sleep_wait"),
enif_make_int(msg_env, delay_ms),
enif_make_uint64(msg_env, sleep_id)
);

/* Use worker_pid when available, otherwise fall back to router_pid */
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
if (!enif_send(NULL, target_pid, msg_env, msg)) {
/* On failure, reset sleep_id */
pthread_mutex_lock(&loop->mutex);
atomic_store(&loop->sync_sleep_id, 0);
pthread_mutex_unlock(&loop->mutex);
enif_free_env(msg_env);
PyErr_SetString(PyExc_RuntimeError, "Failed to send sleep message");
return NULL;
}
enif_free_env(msg_env);

/* Wait for completion - sleep_id already set above */
pthread_mutex_lock(&loop->mutex);

/* Release GIL and wait for completion */
Py_BEGIN_ALLOW_THREADS
while (!atomic_load(&loop->sync_sleep_complete) && !loop->shutdown) {
pthread_cond_wait(&loop->sync_sleep_cond, &loop->mutex);
}
Py_END_ALLOW_THREADS

pthread_mutex_unlock(&loop->mutex);

if (loop->shutdown) {
PyErr_SetString(PyExc_RuntimeError, "Event loop shutdown during sleep");
return NULL;
}

Py_RETURN_NONE;
}

/* Module method definitions */
static PyMethodDef PyEventLoopMethods[] = {
/* Legacy API (uses global event loop) */
Expand Down Expand Up @@ -5282,8 +5137,6 @@ static PyMethodDef PyEventLoopMethods[] = {
{"_release_fd_resource", py_release_fd_resource, METH_VARARGS, "Release fd resource"},
{"_schedule_timer_for", py_schedule_timer_for, METH_VARARGS, "Schedule timer on specific loop"},
{"_cancel_timer_for", py_cancel_timer_for, METH_VARARGS, "Cancel timer on specific loop"},
/* Synchronous sleep (for ASGI fast path) */
{"_erlang_sleep", py_erlang_sleep, METH_VARARGS, "Synchronous sleep using Erlang timer"},
{NULL, NULL, 0, NULL}
};

Expand Down Expand Up @@ -5382,19 +5235,8 @@ int create_default_event_loop(ErlNifEnv *env) {
return -1;
}

if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
pthread_cond_destroy(&loop->event_cond);
pthread_mutex_destroy(&loop->mutex);
enif_release_resource(loop);
return -1;
}
loop->sync_sleep_cond_initialized = true;
atomic_store(&loop->sync_sleep_id, 0);
atomic_store(&loop->sync_sleep_complete, false);

loop->msg_env = enif_alloc_env();
if (loop->msg_env == NULL) {
pthread_cond_destroy(&loop->sync_sleep_cond);
pthread_cond_destroy(&loop->event_cond);
pthread_mutex_destroy(&loop->mutex);
enif_release_resource(loop);
Expand Down
14 changes: 0 additions & 14 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,6 @@ typedef struct erlang_event_loop {
/** @brief Flag indicating a wakeup is pending (uvloop-style coalescing) */
_Atomic bool wake_pending;

/* ========== Synchronous Sleep Support ========== */

/** @brief Current synchronous sleep ID being waited on */
_Atomic uint64_t sync_sleep_id;

/** @brief Flag indicating sleep has completed */
_Atomic bool sync_sleep_complete;

/** @brief Condition variable for sleep completion notification */
pthread_cond_t sync_sleep_cond;

/** @brief Whether sync_sleep_cond has been initialized */
bool sync_sleep_cond_initialized;

/** @brief Interpreter ID: 0 = main interpreter, >0 = subinterpreter */
uint32_t interp_id;
} erlang_event_loop_t;
Expand Down
1 change: 0 additions & 1 deletion c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -3856,7 +3856,6 @@ static ErlNifFunc nif_funcs[] = {
{"get_pending", 1, nif_get_pending, 0},
{"dispatch_callback", 3, nif_dispatch_callback, 0},
{"dispatch_timer", 2, nif_dispatch_timer, 0},
{"dispatch_sleep_complete", 2, nif_dispatch_sleep_complete, 0},
{"get_fd_callback_id", 2, nif_get_fd_callback_id, 0},
{"reselect_reader", 2, nif_reselect_reader, 0},
{"reselect_writer", 2, nif_reselect_writer, 0},
Expand Down
68 changes: 48 additions & 20 deletions docs/asyncio.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ erlang.run(main())
│ ┌──────────────────┐ └────────────────────────────────────┘ │
│ │ asyncio (via │ │
│ │ erlang.run()) │ ┌────────────────────────────────────┐ │
│ │ sleep() ──┼─{sleep_wait}──▶erlang:send_after() + cond_wait │ │
│ │ gather() │ │ │
│ │ wait_for() │◀──{complete}───pthread_cond_broadcast() │ │
│ │ sleep() │ asyncio.sleep() uses call_later() │ │
│ │ gather() │─call_later()──▶which triggers erlang:send_after │ │
│ │ wait_for() │ │ │
│ │ create_task() │ └────────────────────────────────────┘ │
│ └──────────────────┘ │
│ │
Expand Down Expand Up @@ -632,41 +632,42 @@ Unlike Python's standard polling-based event loop, the Erlang event loop uses `e

```
┌─────────────────────────────────────────────────────────────────────────┐
asyncio.sleep() via ErlangEventLoop │
│ asyncio.sleep() via ErlangEventLoop
│ │
│ Python Erlang │
│ ────── ────── │
│ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ asyncio.sleep │ │ py_event_worker │ │
│ │ (0.1) │ │ │ │
│ └────────┬────────┘ │ handle_info({sleep_wait,...}) │ │
│ │ │ │ │ │
│ ▼ │ ▼ │ │
│ ┌─────────────────┐ │ erlang:send_after(100ms) │ │
│ │ ErlangEventLoop │──{sleep_wait,│ │ │ │
│ │ call_later() │ 100, Id}──▶│ ▼ │ │
│ └────────┬────────┘ │ handle_info({sleep_complete}) │ │
│ │ │ │ │ │
│ ┌────────▼────────┐ │ ▼ │ │
│ │ Release GIL │ │ py_nif:dispatch_sleep_complete │ │
│ │ pthread_cond_ │◀─────────────│ │ │ │
│ │ wait() │ signal └─────────┼───────────────────────┘ │
│ └────────┬────────┘ │ │ │
│ │ │ │ │
│ ▼ │ │ │
│ ┌─────────────────┐ │ │ │
│ │ ErlangEventLoop │──{timer,100, │ erlang:send_after(100ms) │ │
│ │ call_later() │ Id}─────▶│ │ │ │
│ └────────┬────────┘ │ ▼ │ │
│ │ │ handle_info({timeout, ...}) │ │
│ ┌────────▼────────┐ │ │ │ │
│ │ Yield to event │ │ ▼ │ │
│ │ loop (dirty │ │ py_nif:dispatch_timer() │ │
│ │ scheduler │◀─────────────│ │ │ │
│ │ released) │ callback └─────────┼───────────────────────┘ │
│ └────────┬────────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
│ │ Reacquire GIL │ │ pthread_cond_broadcast() │ │
│ │ Return result │ │ (wakes Python thread) │ │
│ │ Resume after │ │ Timer callback dispatched to │ │
│ │ timer fires │ │ Python pending queue │ │
│ └─────────────────┘ └─────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
```

**Key features:**
- **GIL released during sleep** - Python thread doesn't hold the GIL while waiting
- **Dirty scheduler released during sleep** - Python yields to event loop, freeing the dirty NIF thread
- **BEAM scheduler integration** - Uses Erlang's native timer system
- **Zero CPU usage** - Condition variable wait, no polling
- **Zero CPU usage** - No polling, event-driven callback
- **Sub-millisecond precision** - Timers managed by BEAM scheduler

### Basic Usage
Expand All @@ -688,6 +689,33 @@ result = erlang.run(my_handler())

When using `erlang.run()` or the Erlang event loop, all standard asyncio functions work seamlessly with Erlang's backend.

#### erlang.sleep(seconds)

Sleep for the specified duration. Works in both async and sync contexts, and **always releases the dirty NIF scheduler**.

```python
import erlang

# Async context - releases dirty scheduler via event loop yield
async def async_handler():
await erlang.sleep(0.1) # Uses asyncio.sleep() internally
return "done"

# Sync context - releases dirty scheduler via Erlang process suspension
def sync_handler():
erlang.sleep(0.1) # Uses receive/after, true cooperative yield
return "done"
```

**Dirty Scheduler Release:**

| Context | Mechanism | Dirty Scheduler |
|---------|-----------|-----------------|
| Async (`await erlang.sleep()`) | `asyncio.sleep()` via `call_later()` | Released (yields to event loop) |
| Sync (`erlang.sleep()`) | `erlang.call('_py_sleep')` with `receive/after` | Released (Erlang process suspends) |

Both modes allow other Erlang processes and Python contexts to run during the sleep.

#### asyncio.sleep(delay)

Sleep for the specified delay. Uses Erlang's `erlang:send_after/3` internally.
Expand Down
Loading
Loading