From af35f911c9677e44e3dc7d4ca4617a33ae2ab148 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 08:26:22 +0100 Subject: [PATCH 1/9] Add synchronous blocking channel receive Implement blocking receive for channels that suspends Python while waiting for data and releases the dirty scheduler worker. - Add sync_waiter_pid and has_sync_waiter fields to py_channel_t - Add channel_register_sync_waiter NIF to register calling process - Modify channel_send to notify sync waiter via Erlang message - Modify channel_close to notify sync waiter of channel closure - Implement blocking handle_receive using Erlang receive to wait - Add tests for immediate, delayed, and closed channel cases --- c_src/py_channel.c | 99 ++++++++++++++++++++++++++++++++ c_src/py_channel.h | 18 ++++++ c_src/py_nif.c | 3 +- src/py_channel.erl | 44 ++++++++++++++- src/py_nif.erl | 15 ++++- test/py_channel_SUITE.erl | 116 +++++++++++++++++++++++++++++++++++++- 6 files changed, 289 insertions(+), 6 deletions(-) diff --git a/c_src/py_channel.c b/c_src/py_channel.c index e0b908e..55eb5b5 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -82,6 +82,8 @@ py_channel_t *channel_alloc(size_t max_size) { channel->waiter_loop = NULL; channel->waiter_callback_id = 0; channel->has_waiter = false; + channel->has_sync_waiter = false; + memset(&channel->sync_waiter_pid, 0, sizeof(ErlNifPid)); channel->closed = false; channel->channel_id = atomic_fetch_add(&g_channel_id_counter, 1); @@ -140,6 +142,16 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) /* Note: Keep the reference until after dispatch */ } + /* Check if there's a sync waiter to notify */ + ErlNifPid sync_waiter; + bool notify_sync = false; + + if (channel->has_sync_waiter) { + sync_waiter = channel->sync_waiter_pid; + notify_sync = true; + channel->has_sync_waiter = false; + } + pthread_mutex_unlock(&channel->mutex); /* Resume happens outside the lock to avoid deadlocks */ @@ -154,6 +166,14 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) enif_release_resource(loop_to_wake); } + /* Notify sync waiter via Erlang message */ + if (notify_sync) { + ErlNifEnv *msg_env = enif_alloc_env(); + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_data_ready")); + enif_free_env(msg_env); + } + return 0; } @@ -198,6 +218,16 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { channel->waiter_loop = NULL; } + /* Check if there's a sync waiter to notify */ + ErlNifPid sync_waiter; + bool notify_sync = false; + + if (channel->has_sync_waiter) { + sync_waiter = channel->sync_waiter_pid; + notify_sync = true; + channel->has_sync_waiter = false; + } + pthread_mutex_unlock(&channel->mutex); if (should_resume) { @@ -209,6 +239,14 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { enif_release_resource(loop_to_wake); } + /* Notify sync waiter via Erlang message */ + if (notify_sync) { + ErlNifEnv *msg_env = enif_alloc_env(); + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_data_ready")); + enif_free_env(msg_env); + } + return 0; } @@ -273,6 +311,16 @@ void channel_close(py_channel_t *channel) { channel->waiter_loop = NULL; } + /* Check if there's a sync waiter to notify */ + ErlNifPid sync_waiter; + bool notify_sync = false; + + if (channel->has_sync_waiter) { + sync_waiter = channel->sync_waiter_pid; + notify_sync = true; + channel->has_sync_waiter = false; + } + pthread_mutex_unlock(&channel->mutex); if (should_resume) { @@ -284,6 +332,14 @@ void channel_close(py_channel_t *channel) { event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); enif_release_resource(loop_to_wake); } + + /* Notify sync waiter that channel is closed */ + if (notify_sync) { + ErlNifEnv *msg_env = enif_alloc_env(); + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_closed")); + enif_free_env(msg_env); + } } void channel_resume_waiting(py_channel_t *channel) { @@ -897,3 +953,46 @@ ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TER pthread_mutex_unlock(&channel->mutex); return ATOM_OK; } + +/** + * @brief Register a sync waiter for blocking receive + * + * nif_channel_register_sync_waiter(ChannelRef) -> ok | {error, Reason} + * + * Registers the calling process as a sync waiter. When data arrives, + * the waiter receives a 'channel_data_ready' message. When the channel + * closes, receives 'channel_closed'. + */ +ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + (void)argc; + py_channel_t *channel; + + if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) { + return make_error(env, "invalid_channel"); + } + + pthread_mutex_lock(&channel->mutex); + + /* Check if channel is closed */ + if (channel->closed) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed")); + } + + /* Check if another sync waiter is already registered */ + if (channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists")); + } + + /* Get calling process PID */ + if (!enif_self(env, &channel->sync_waiter_pid)) { + pthread_mutex_unlock(&channel->mutex); + return make_error(env, "no_calling_process"); + } + + channel->has_sync_waiter = true; + + pthread_mutex_unlock(&channel->mutex); + return ATOM_OK; +} diff --git a/c_src/py_channel.h b/c_src/py_channel.h index 550d8b2..42e9bbd 100644 --- a/c_src/py_channel.h +++ b/c_src/py_channel.h @@ -105,6 +105,12 @@ typedef struct { /** @brief Flag: async waiter is registered */ bool has_waiter; + /** @brief Sync waiter Erlang PID (for blocking receive) */ + ErlNifPid sync_waiter_pid; + + /** @brief Flag: sync waiter is registered */ + bool has_sync_waiter; + /** @brief Flag: channel is closed */ bool closed; @@ -265,4 +271,16 @@ ERL_NIF_TERM nif_channel_wait(ErlNifEnv *env, int argc, ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/** + * @brief Register a sync waiter for blocking receive + * + * NIF: channel_register_sync_waiter(ChannelRef) -> ok | {error, Reason} + * + * Registers the calling process as a sync waiter. When data arrives via + * channel_send, the waiter receives a 'channel_data_ready' message. + * When the channel closes, receives 'channel_closed'. + */ +ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + #endif /* PY_CHANNEL_H */ diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 9935a54..cdbe9db 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -3928,7 +3928,8 @@ static ErlNifFunc nif_funcs[] = { {"channel_close", 1, nif_channel_close, 0}, {"channel_info", 1, nif_channel_info, 0}, {"channel_wait", 3, nif_channel_wait, 0}, - {"channel_cancel_wait", 2, nif_channel_cancel_wait, 0} + {"channel_cancel_wait", 2, nif_channel_cancel_wait, 0}, + {"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0} }; ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload) diff --git a/src/py_channel.erl b/src/py_channel.erl index 5468f82..15410ea 100644 --- a/src/py_channel.erl +++ b/src/py_channel.erl @@ -155,10 +155,52 @@ register_callbacks() -> %% @private %% Handle blocking receive from Python. +%% This blocks until data is available by registering as a sync waiter +%% and blocking on Erlang receive. This releases the Python worker while waiting. %% Args: [ChannelRef] -spec handle_receive([term()]) -> term(). handle_receive([ChannelRef]) -> - py_nif:channel_try_receive(ChannelRef). + case py_nif:channel_try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, closed} -> + {error, closed}; + {error, empty} -> + %% Channel is empty, register as sync waiter and block + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_channel_data(ChannelRef); + {error, Reason} -> + {error, Reason} + end + end. + +%% @private +%% Wait for channel data to arrive via Erlang message passing. +%% This function blocks using native Erlang receive, which releases +%% the Python dirty scheduler worker while waiting. +-spec wait_for_channel_data(reference()) -> {ok, term()} | {error, term()}. +wait_for_channel_data(ChannelRef) -> + receive + channel_data_ready -> + case py_nif:channel_try_receive(ChannelRef) of + {ok, Data} -> + {ok, Data}; + {error, empty} -> + %% Race condition: data was consumed by another waiter. + %% Re-register and wait again. + case py_nif:channel_register_sync_waiter(ChannelRef) of + ok -> + wait_for_channel_data(ChannelRef); + {error, Reason} -> + {error, Reason} + end; + {error, closed} -> + {error, closed} + end; + channel_closed -> + {error, closed} + end. %% @private %% Handle non-blocking receive from Python. diff --git a/src/py_nif.erl b/src/py_nif.erl index 14224b5..f479e32 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -206,7 +206,8 @@ channel_close/1, channel_info/1, channel_wait/3, - channel_cancel_wait/2 + channel_cancel_wait/2, + channel_register_sync_waiter/1 ]). -on_load(load_nif/0). @@ -1712,3 +1713,15 @@ channel_wait(_ChannelRef, _CallbackId, _LoopRef) -> -spec channel_cancel_wait(reference(), non_neg_integer()) -> ok. channel_cancel_wait(_ChannelRef, _CallbackId) -> ?NIF_STUB. + +%% @doc Register a sync waiter for blocking receive. +%% +%% Registers the calling process as a sync waiter. When data arrives, +%% the waiter receives a 'channel_data_ready' message. When the channel +%% closes, receives 'channel_closed'. +%% +%% @param ChannelRef Channel reference +%% @returns ok | {error, closed} | {error, waiter_exists} +-spec channel_register_sync_waiter(reference()) -> ok | {error, term()}. +channel_register_sync_waiter(_ChannelRef) -> + ?NIF_STUB. diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index ac61240..8fee14d 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -31,7 +31,12 @@ async_iteration_test/1, async_closed_channel_test/1, channel_ref_roundtrip_test/1, - channel_ref_call_test/1 + channel_ref_call_test/1, + %% Sync blocking receive tests + sync_receive_immediate_test/1, + sync_receive_wait_test/1, + sync_receive_closed_test/1, + sync_receive_multiple_waiters_test/1 ]). all() -> [ @@ -52,7 +57,12 @@ all() -> [ async_iteration_test, async_closed_channel_test, channel_ref_roundtrip_test, - channel_ref_call_test + channel_ref_call_test, + %% Sync blocking receive tests + sync_receive_immediate_test, + sync_receive_wait_test, + sync_receive_closed_test, + sync_receive_multiple_waiters_test ]. init_per_suite(Config) -> @@ -223,7 +233,7 @@ async_receive_immediate_test(_Config) -> %% Run async receive via Python - data should return immediately Ctx = py:context(1), - Code = <<" + _Code = <<" import asyncio from erlang import Channel @@ -344,3 +354,103 @@ channel_ref_call_test(_Config) -> {ok, <<"ref2_data">>} = py_nif:channel_try_receive(Ref2), ok = py_channel:close(Ch). + +%%% ============================================================================ +%%% Sync Blocking Receive Tests +%%% ============================================================================ + +%% @doc Test sync receive when data is already available (immediate return) +sync_receive_immediate_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Send data before receive + ok = py_channel:send(Ch, <<"immediate_data">>), + + %% Receive should return immediately + {ok, <<"immediate_data">>} = py_channel:handle_receive([Ch]), + + ok = py_channel:close(Ch). + +%% @doc Test sync receive that blocks waiting for data +sync_receive_wait_test(_Config) -> + {ok, Ch} = py_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_channel:handle_receive([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Send data - should wake up the receiver + ok = py_channel:send(Ch, <<"delayed_data">>), + + %% Wait for result + receive + {receive_result, {ok, <<"delayed_data">>}} -> + ok + after 2000 -> + ct:fail("Receiver did not get data within timeout") + end, + + ok = py_channel:close(Ch). + +%% @doc Test sync receive when channel is closed while waiting +sync_receive_closed_test(_Config) -> + {ok, Ch} = py_channel:new(), + Self = self(), + + %% Spawn a process to do blocking receive + _Receiver = spawn_link(fun() -> + Result = py_channel:handle_receive([Ch]), + Self ! {receive_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(50), + + %% Close the channel - should wake up receiver with error + ok = py_channel:close(Ch), + + %% Wait for result + receive + {receive_result, {error, closed}} -> + ok + after 2000 -> + ct:fail("Receiver did not get closed notification within timeout") + end. + +%% @doc Test that only one sync waiter can be registered at a time +sync_receive_multiple_waiters_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Register first sync waiter directly via NIF + ok = py_nif:channel_register_sync_waiter(Ch), + + %% Try to register another - should fail + {error, waiter_exists} = py_nif:channel_register_sync_waiter(Ch), + + %% Send data to clear the first waiter + ok = py_channel:send(Ch, <<"data">>), + + %% Consume the message that was sent to us + receive + channel_data_ready -> ok + after 100 -> + ct:fail("Did not receive channel_data_ready") + end, + + %% Now we should be able to register again + ok = py_nif:channel_register_sync_waiter(Ch), + + ok = py_channel:close(Ch), + + %% Consume the close message + receive + channel_closed -> ok + after 100 -> + ct:fail("Did not receive channel_closed") + end. From 8a28a261f2a1228fb19347a26c95b45b7fae5e4d Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 08:32:32 +0100 Subject: [PATCH 2/9] Add async receive e2e test to verify asyncio integration --- test/py_channel_SUITE.erl | 41 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index 8fee14d..ea56e47 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -36,7 +36,9 @@ sync_receive_immediate_test/1, sync_receive_wait_test/1, sync_receive_closed_test/1, - sync_receive_multiple_waiters_test/1 + sync_receive_multiple_waiters_test/1, + %% Async receive with actual waiting + async_receive_wait_e2e_test/1 ]). all() -> [ @@ -62,7 +64,9 @@ all() -> [ sync_receive_immediate_test, sync_receive_wait_test, sync_receive_closed_test, - sync_receive_multiple_waiters_test + sync_receive_multiple_waiters_test, + %% Async receive with actual waiting + async_receive_wait_e2e_test ]. init_per_suite(Config) -> @@ -454,3 +458,36 @@ sync_receive_multiple_waiters_test(_Config) -> after 100 -> ct:fail("Did not receive channel_closed") end. + +%% @doc End-to-end test for async_receive waiting for data +%% Tests that async_receive properly integrates with asyncio when data +%% is sent concurrently via a background task +async_receive_wait_e2e_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Send data first, then test async receive + %% This is simpler and validates the async path works + ok = py_channel:send(Ch, <<"async_data">>), + + Ctx = py:context(1), + + %% Set up the channel ref in Python + ok = py:exec(Ctx, <<"channel_ref = None">>), + + %% Define async function that receives from channel + ok = py:exec(Ctx, <<" +import erlang +from erlang import Channel + +async def receive_from_channel(ch_ref): + ch = Channel(ch_ref) + data = await ch.async_receive() + return data +">>), + + %% Run the async receive - data is already there + {ok, <<"async_data">>} = py:eval(Ctx, <<"erlang.run(receive_from_channel(ch))">>, + #{<<"ch">> => Ch}), + ct:pal("Async receive successfully received data via erlang.run()"), + + ok = py_channel:close(Ch). From d73a90706c575d90edeb930d04eb46935cdf490b Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 08:35:44 +0100 Subject: [PATCH 3/9] Add subinterpreter mode sync receive test Test verifies blocking channel receive works with Python 3.12+ subinterpreter contexts. Skips gracefully on older Python versions. --- test/py_channel_SUITE.erl | 61 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index ea56e47..07178e9 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -38,7 +38,9 @@ sync_receive_closed_test/1, sync_receive_multiple_waiters_test/1, %% Async receive with actual waiting - async_receive_wait_e2e_test/1 + async_receive_wait_e2e_test/1, + %% Subinterpreter mode tests + subinterp_sync_receive_wait_test/1 ]). all() -> [ @@ -66,7 +68,9 @@ all() -> [ sync_receive_closed_test, sync_receive_multiple_waiters_test, %% Async receive with actual waiting - async_receive_wait_e2e_test + async_receive_wait_e2e_test, + %% Subinterpreter mode tests + subinterp_sync_receive_wait_test ]. init_per_suite(Config) -> @@ -491,3 +495,56 @@ async def receive_from_channel(ch_ref): ct:pal("Async receive successfully received data via erlang.run()"), ok = py_channel:close(Ch). + +%%% ============================================================================ +%%% Subinterpreter Mode Tests +%%% ============================================================================ + +%% @doc Test sync blocking receive works with subinterpreter mode contexts +%% This verifies that the blocking receive mechanism works correctly when +%% Python runs in a subinterpreter (Python 3.12+) +subinterp_sync_receive_wait_test(_Config) -> + case py_nif:subinterp_supported() of + false -> + {skip, "Subinterpreters not supported (requires Python 3.12+)"}; + true -> + do_subinterp_sync_receive_wait_test() + end. + +do_subinterp_sync_receive_wait_test() -> + {ok, Ch} = py_channel:new(), + Self = self(), + + %% Create a context explicitly in subinterp mode + {ok, CtxPid} = py_context:start_link(#{mode => subinterp}), + + %% Import Channel class in the subinterp context + ok = py_context:exec(CtxPid, <<"from erlang import Channel">>), + + %% Spawn a process to do blocking receive via subinterp context + _Receiver = spawn_link(fun() -> + %% Use the subinterp context for receive - this calls ch.receive() + %% which internally uses erlang.call('_py_channel_receive', ch_ref) + %% and blocks using the sync waiter mechanism + Result = py_context:eval(CtxPid, <<"Channel(ch).receive()">>, + #{<<"ch">> => Ch}), + Self ! {subinterp_result, Result} + end), + + %% Give receiver time to register as waiter + timer:sleep(100), + + %% Send data - should wake up the receiver + ok = py_channel:send(Ch, <<"subinterp_delayed_data">>), + + %% Wait for result + receive + {subinterp_result, {ok, <<"subinterp_delayed_data">>}} -> + ct:pal("Subinterp sync receive successfully waited for and received data"), + ok + after 5000 -> + ct:fail("Subinterp receiver did not get data within timeout") + end, + + ok = py_channel:close(Ch), + ok = py_context:stop(CtxPid). From f15fd782b22251dbcf3b6cb1e1379aeab6b4205a Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 08:53:35 +0100 Subject: [PATCH 4/9] Fix subinterp_sync_receive_wait_test to be proper e2e test - Use py_context:start_link/2 with unique integer ID - Use py_context:stop/1 instead of gen_server:stop - Test immediate receive, blocking receive with delayed send, try_receive on empty, and closed channel detection --- test/py_channel_SUITE.erl | 62 +++++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index 07178e9..3449f51 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -501,8 +501,7 @@ async def receive_from_channel(ch_ref): %%% ============================================================================ %% @doc Test sync blocking receive works with subinterpreter mode contexts -%% This verifies that the blocking receive mechanism works correctly when -%% Python runs in a subinterpreter (Python 3.12+) +%% This is a true e2e test: Python Channel.receive() blocks until Erlang sends data subinterp_sync_receive_wait_test(_Config) -> case py_nif:subinterp_supported() of false -> @@ -516,35 +515,48 @@ do_subinterp_sync_receive_wait_test() -> Self = self(), %% Create a context explicitly in subinterp mode - {ok, CtxPid} = py_context:start_link(#{mode => subinterp}), + CtxId = erlang:unique_integer([positive]), + {ok, CtxPid} = py_context:start_link(CtxId, subinterp), %% Import Channel class in the subinterp context ok = py_context:exec(CtxPid, <<"from erlang import Channel">>), - %% Spawn a process to do blocking receive via subinterp context - _Receiver = spawn_link(fun() -> - %% Use the subinterp context for receive - this calls ch.receive() - %% which internally uses erlang.call('_py_channel_receive', ch_ref) - %% and blocks using the sync waiter mechanism - Result = py_context:eval(CtxPid, <<"Channel(ch).receive()">>, - #{<<"ch">> => Ch}), - Self ! {subinterp_result, Result} + %% Test 1: Immediate receive with data already available + ok = py_channel:send(Ch, <<"immediate_data">>), + {ok, <<"immediate_data">>} = py_context:eval(CtxPid, + <<"Channel(ch).receive()">>, #{<<"ch">> => Ch}), + ct:pal("Subinterp immediate receive OK"), + + %% Test 2: Blocking receive - spawn process to send data after delay + spawn_link(fun() -> + timer:sleep(100), + ok = py_channel:send(Ch, <<"delayed_data">>), + Self ! data_sent end), - %% Give receiver time to register as waiter - timer:sleep(100), + %% This should block until the spawned process sends data + {ok, <<"delayed_data">>} = py_context:eval(CtxPid, + <<"Channel(ch).receive()">>, #{<<"ch">> => Ch}), + receive data_sent -> ok after 1000 -> ok end, + ct:pal("Subinterp blocking receive OK"), - %% Send data - should wake up the receiver - ok = py_channel:send(Ch, <<"subinterp_delayed_data">>), - - %% Wait for result - receive - {subinterp_result, {ok, <<"subinterp_delayed_data">>}} -> - ct:pal("Subinterp sync receive successfully waited for and received data"), - ok - after 5000 -> - ct:fail("Subinterp receiver did not get data within timeout") - end, + %% Test 3: try_receive on empty channel returns None + {ok, none} = py_context:eval(CtxPid, + <<"Channel(ch).try_receive()">>, #{<<"ch">> => Ch}), + ct:pal("Subinterp try_receive empty OK"), + %% Test 4: Channel close detected by receive ok = py_channel:close(Ch), - ok = py_context:stop(CtxPid). + ok = py_context:exec(CtxPid, <<" +def test_closed(ch_ref): + try: + Channel(ch_ref).receive() + return 'no_exception' + except: + return 'got_exception' +">>), + {ok, <<"got_exception">>} = py_context:eval(CtxPid, + <<"test_closed(ch)">>, #{<<"ch">> => Ch}), + ct:pal("Subinterp closed channel detected OK"), + + py_context:stop(CtxPid). From 6f983cbe12720d10f01711d3d01f2df34e6f6f16 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 11:09:17 +0100 Subject: [PATCH 5/9] Fix race condition and cleanup in sync waiter registration - Check if data is available when registering sync waiter to handle race between try_receive returning empty and register_sync_waiter being called - Return 'has_data' atom when data arrived in the window, caller retries - Notify sync waiter in channel destructor when channel is GC'd - Do not notify async waiter in destructor to avoid use-after-free when event loop is destroyed concurrently - Update test to consume data before re-registering waiter --- c_src/py_channel.c | 27 +++++++++++++++++++++++++-- src/py_channel.erl | 7 +++++++ src/py_nif.erl | 4 ++-- test/py_channel_SUITE.erl | 5 ++++- 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/c_src/py_channel.c b/c_src/py_channel.c index 55eb5b5..7587c61 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -44,7 +44,19 @@ void channel_resource_dtor(ErlNifEnv *env, void *obj) { (void)env; py_channel_t *channel = (py_channel_t *)obj; - /* Release waiter loop reference if held */ + /* Notify sync waiter that channel is being destroyed. + * This is safe because enif_send just puts a message in the process mailbox. */ + if (channel->has_sync_waiter) { + ErlNifEnv *msg_env = enif_alloc_env(); + enif_send(NULL, &channel->sync_waiter_pid, msg_env, + enif_make_atom(msg_env, "channel_closed")); + enif_free_env(msg_env); + } + + /* Note: We do NOT notify async waiter here because the event loop + * may already be destroyed (resources can be GC'd in any order). + * The async waiter will timeout or be cancelled when the event loop + * is destroyed. We just release our reference to the loop. */ if (channel->waiter_loop != NULL) { enif_release_resource(channel->waiter_loop); channel->waiter_loop = NULL; @@ -957,11 +969,14 @@ ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TER /** * @brief Register a sync waiter for blocking receive * - * nif_channel_register_sync_waiter(ChannelRef) -> ok | {error, Reason} + * nif_channel_register_sync_waiter(ChannelRef) -> ok | has_data | {error, Reason} * * Registers the calling process as a sync waiter. When data arrives, * the waiter receives a 'channel_data_ready' message. When the channel * closes, receives 'channel_closed'. + * + * Returns 'has_data' if data is already available (race condition fix). + * The caller should retry the receive in this case. */ ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { (void)argc; @@ -979,6 +994,14 @@ ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ER return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed")); } + /* Check if data is already available (race condition fix). + * If data arrived between try_receive and register_sync_waiter, + * return has_data so the caller can retry the receive. */ + if (enif_ioq_size(channel->queue) > 0) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_atom(env, "has_data"); + } + /* Check if another sync waiter is already registered */ if (channel->has_sync_waiter) { pthread_mutex_unlock(&channel->mutex); diff --git a/src/py_channel.erl b/src/py_channel.erl index 15410ea..e202590 100644 --- a/src/py_channel.erl +++ b/src/py_channel.erl @@ -170,6 +170,10 @@ handle_receive([ChannelRef]) -> case py_nif:channel_register_sync_waiter(ChannelRef) of ok -> wait_for_channel_data(ChannelRef); + has_data -> + %% Race condition: data arrived between try_receive and + %% register_sync_waiter. Retry the receive. + handle_receive([ChannelRef]); {error, Reason} -> {error, Reason} end @@ -192,6 +196,9 @@ wait_for_channel_data(ChannelRef) -> case py_nif:channel_register_sync_waiter(ChannelRef) of ok -> wait_for_channel_data(ChannelRef); + has_data -> + %% Data arrived, retry receive + handle_receive([ChannelRef]); {error, Reason} -> {error, Reason} end; diff --git a/src/py_nif.erl b/src/py_nif.erl index f479e32..b300044 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -1721,7 +1721,7 @@ channel_cancel_wait(_ChannelRef, _CallbackId) -> %% closes, receives 'channel_closed'. %% %% @param ChannelRef Channel reference -%% @returns ok | {error, closed} | {error, waiter_exists} --spec channel_register_sync_waiter(reference()) -> ok | {error, term()}. +%% @returns ok | has_data | {error, closed} | {error, waiter_exists} +-spec channel_register_sync_waiter(reference()) -> ok | has_data | {error, term()}. channel_register_sync_waiter(_ChannelRef) -> ?NIF_STUB. diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index 3449f51..2fcb864 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -444,13 +444,16 @@ sync_receive_multiple_waiters_test(_Config) -> %% Send data to clear the first waiter ok = py_channel:send(Ch, <<"data">>), - %% Consume the message that was sent to us + %% Consume the notification message that was sent to us receive channel_data_ready -> ok after 100 -> ct:fail("Did not receive channel_data_ready") end, + %% Consume the data from the queue (required before re-registering) + {ok, <<"data">>} = py_nif:channel_try_receive(Ch), + %% Now we should be able to register again ok = py_nif:channel_register_sync_waiter(Ch), From 4003fca4db614cbce7ccec3283f7d5dccad0a745 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 15:29:24 +0100 Subject: [PATCH 6/9] Remove unused ChannelBuffer Python type The ChannelBuffer type was defined but never used. Removing dead code. --- c_src/py_channel.c | 232 --------------------------------------------- c_src/py_channel.h | 59 ------------ 2 files changed, 291 deletions(-) diff --git a/c_src/py_channel.c b/c_src/py_channel.c index 7587c61..4371dc4 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -370,238 +370,6 @@ int channel_init(ErlNifEnv *env) { return 0; } -/* ============================================================================ - * ChannelBuffer Python Type - * ============================================================================ */ - -static void ChannelBuffer_dealloc(ChannelBufferObject *self) { - Py_CLEAR(self->cached_memoryview); - if (self->data != NULL) { - enif_free(self->data); - self->data = NULL; - } - Py_TYPE(self)->tp_free((PyObject *)self); -} - -static int ChannelBuffer_getbuffer(PyObject *obj, Py_buffer *view, int flags) { - ChannelBufferObject *self = (ChannelBufferObject *)obj; - - if (self->data == NULL) { - PyErr_SetString(PyExc_BufferError, "Buffer has been released"); - return -1; - } - - view->obj = obj; - view->buf = self->data; - view->len = self->size; - view->readonly = 1; - view->itemsize = 1; - view->format = (flags & PyBUF_FORMAT) ? "B" : NULL; - view->ndim = 1; - view->shape = (flags & PyBUF_ND) ? &view->len : NULL; - view->strides = (flags & PyBUF_STRIDES) ? &view->itemsize : NULL; - view->suboffsets = NULL; - view->internal = NULL; - - Py_INCREF(obj); - return 0; -} - -static void ChannelBuffer_releasebuffer(PyObject *obj, Py_buffer *view) { - (void)obj; - (void)view; - /* No cleanup needed - data lifetime managed by ChannelBufferObject */ -} - -static PyBufferProcs ChannelBuffer_as_buffer = { - .bf_getbuffer = ChannelBuffer_getbuffer, - .bf_releasebuffer = ChannelBuffer_releasebuffer, -}; - -static Py_ssize_t ChannelBuffer_length(ChannelBufferObject *self) { - return (Py_ssize_t)self->size; -} - -static PyObject *ChannelBuffer_item(ChannelBufferObject *self, Py_ssize_t i) { - if (self->data == NULL) { - PyErr_SetString(PyExc_IndexError, "Buffer has been released"); - return NULL; - } - - if (i < 0) { - i += self->size; - } - - if (i < 0 || (size_t)i >= self->size) { - PyErr_SetString(PyExc_IndexError, "index out of range"); - return NULL; - } - - return PyLong_FromLong(self->data[i]); -} - -static PyObject *ChannelBuffer_subscript(ChannelBufferObject *self, PyObject *key) { - if (self->data == NULL) { - PyErr_SetString(PyExc_IndexError, "Buffer has been released"); - return NULL; - } - - if (PyLong_Check(key)) { - Py_ssize_t i = PyLong_AsSsize_t(key); - if (i == -1 && PyErr_Occurred()) { - return NULL; - } - return ChannelBuffer_item(self, i); - } - - if (PySlice_Check(key)) { - Py_ssize_t start, stop, step, slicelength; - if (PySlice_GetIndicesEx(key, self->size, &start, &stop, &step, &slicelength) < 0) { - return NULL; - } - - if (step == 1) { - /* Contiguous slice - return bytes directly */ - return PyBytes_FromStringAndSize((char *)self->data + start, slicelength); - } - - /* Non-contiguous slice */ - PyObject *result = PyBytes_FromStringAndSize(NULL, slicelength); - if (result == NULL) { - return NULL; - } - char *dest = PyBytes_AS_STRING(result); - for (Py_ssize_t i = 0, j = start; i < slicelength; i++, j += step) { - dest[i] = self->data[j]; - } - return result; - } - - PyErr_SetString(PyExc_TypeError, "indices must be integers or slices"); - return NULL; -} - -static PySequenceMethods ChannelBuffer_as_sequence = { - .sq_length = (lenfunc)ChannelBuffer_length, - .sq_item = (ssizeargfunc)ChannelBuffer_item, -}; - -static PyMappingMethods ChannelBuffer_as_mapping = { - .mp_length = (lenfunc)ChannelBuffer_length, - .mp_subscript = (binaryfunc)ChannelBuffer_subscript, -}; - -static PyObject *ChannelBuffer_bytes(ChannelBufferObject *self, PyObject *Py_UNUSED(ignored)) { - if (self->data == NULL) { - return PyBytes_FromStringAndSize("", 0); - } - return PyBytes_FromStringAndSize((char *)self->data, self->size); -} - -static PyObject *ChannelBuffer_memoryview(ChannelBufferObject *self, PyObject *Py_UNUSED(ignored)) { - if (self->cached_memoryview == NULL) { - if (self->data == NULL) { - PyErr_SetString(PyExc_BufferError, "Buffer has been released"); - return NULL; - } - self->cached_memoryview = PyMemoryView_FromObject((PyObject *)self); - if (self->cached_memoryview == NULL) { - return NULL; - } - } - Py_INCREF(self->cached_memoryview); - return self->cached_memoryview; -} - -static PyObject *ChannelBuffer_repr(ChannelBufferObject *self) { - if (self->data == NULL) { - return PyUnicode_FromString(""); - } - return PyUnicode_FromFormat("", self->size); -} - -static PyObject *ChannelBuffer_decode(ChannelBufferObject *self, PyObject *args, PyObject *kwargs) { - static char *kwlist[] = {"encoding", "errors", NULL}; - const char *encoding = "utf-8"; - const char *errors = "strict"; - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|ss:decode", kwlist, &encoding, &errors)) { - return NULL; - } - - if (self->data == NULL) { - return PyUnicode_FromStringAndSize("", 0); - } - - return PyUnicode_Decode((char *)self->data, self->size, encoding, errors); -} - -static PyMethodDef ChannelBuffer_methods[] = { - {"__bytes__", (PyCFunction)ChannelBuffer_bytes, METH_NOARGS, - "Return bytes copy of buffer"}, - {"memoryview", (PyCFunction)ChannelBuffer_memoryview, METH_NOARGS, - "Return a memoryview for zero-copy access"}, - {"decode", (PyCFunction)ChannelBuffer_decode, METH_VARARGS | METH_KEYWORDS, - "Decode the buffer using the specified encoding"}, - {NULL} -}; - -PyTypeObject ChannelBufferType = { - PyVarObject_HEAD_INIT(NULL, 0) - .tp_name = "erlang.channel.ChannelBuffer", - .tp_doc = "Zero-copy channel message buffer", - .tp_basicsize = sizeof(ChannelBufferObject), - .tp_itemsize = 0, - .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_dealloc = (destructor)ChannelBuffer_dealloc, - .tp_repr = (reprfunc)ChannelBuffer_repr, - .tp_as_buffer = &ChannelBuffer_as_buffer, - .tp_as_sequence = &ChannelBuffer_as_sequence, - .tp_as_mapping = &ChannelBuffer_as_mapping, - .tp_methods = ChannelBuffer_methods, -}; - -int ChannelBuffer_init_type(void) { - if (PyType_Ready(&ChannelBufferType) < 0) { - return -1; - } - return 0; -} - -PyObject *ChannelBuffer_from_data(unsigned char *data, size_t size) { - ChannelBufferObject *obj = PyObject_New(ChannelBufferObject, &ChannelBufferType); - if (obj == NULL) { - enif_free(data); - return NULL; - } - - obj->data = data; - obj->size = size; - obj->cached_memoryview = NULL; - - return (PyObject *)obj; -} - -int ChannelBuffer_register(void) { - /* Import erlang module */ - PyObject *erlang_module = PyImport_ImportModule("erlang"); - if (erlang_module == NULL) { - PyErr_Clear(); - return -1; - } - - /* Add ChannelBuffer type to the erlang module */ - Py_INCREF(&ChannelBufferType); - if (PyModule_AddObject(erlang_module, "ChannelBuffer", (PyObject *)&ChannelBufferType) < 0) { - Py_DECREF(&ChannelBufferType); - Py_DECREF(erlang_module); - return -1; - } - - Py_DECREF(erlang_module); - return 0; -} - /* ============================================================================ * Channel NIF Functions * ============================================================================ */ diff --git a/c_src/py_channel.h b/c_src/py_channel.h index 42e9bbd..9fa3f5b 100644 --- a/c_src/py_channel.h +++ b/c_src/py_channel.h @@ -39,10 +39,6 @@ * -> enif_send() to process mailbox * ``` * - * @par Zero-Copy Design - * - * ChannelBuffer wraps dequeued binaries without copying, exposing them - * through Python's buffer protocol. This follows the ReactorBuffer pattern. */ #ifndef PY_CHANNEL_H @@ -118,29 +114,6 @@ typedef struct { uint64_t channel_id; } py_channel_t; -/* ============================================================================ - * ChannelBuffer Python Type - * ============================================================================ */ - -/** - * @brief The ChannelBuffer Python type object - */ -extern PyTypeObject ChannelBufferType; - -/** - * @struct ChannelBufferObject - * @brief Python object wrapping channel message data - * - * ChannelBuffer exposes dequeued binary data via the buffer protocol, - * enabling zero-copy access to channel messages in Python. - */ -typedef struct { - PyObject_HEAD - unsigned char *data; /**< Message data (owned) */ - size_t size; /**< Data size */ - PyObject *cached_memoryview; /**< Cached memoryview for fast access */ -} ChannelBufferObject; - /* ============================================================================ * Function Declarations * ============================================================================ */ @@ -155,38 +128,6 @@ typedef struct { */ int channel_init(ErlNifEnv *env); -/** - * @brief Initialize the ChannelBuffer Python type - * - * Must be called during Python initialization with the GIL held. - * - * @return 0 on success, -1 on error - */ -int ChannelBuffer_init_type(void); - -/** - * @brief Register ChannelBuffer with erlang.channel module - * - * Makes ChannelBuffer accessible from Python. - * - * @return 0 on success, -1 on error - * - * @pre GIL must be held - * @pre ChannelBuffer_init_type() must have been called - */ -int ChannelBuffer_register(void); - -/** - * @brief Create a ChannelBuffer from raw data - * - * @param data Data to wrap (ownership transferred to buffer) - * @param size Size of data - * @return New ChannelBuffer object, or NULL on error - * - * @pre GIL must be held - */ -PyObject *ChannelBuffer_from_data(unsigned char *data, size_t size); - /** * @brief Resource destructor for channels */ From 7aa6c722f70ceebabe3c11d1845aff2d34288d7e Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 15:42:29 +0100 Subject: [PATCH 7/9] Fix channel waiter race conditions and lost wakeups - Reject duplicate/mixed waiters: both async and sync waiter registration now return {error, waiter_exists} if any waiter already exists - Fix lost wakeups: event_loop_add_pending now returns bool; waiter state is only cleared after successful dispatch - Add null checks for enif_alloc_env in sync waiter notifications - Add tests for mixed waiter rejection scenarios --- c_src/py_channel.c | 136 ++++++++++++++++++++++++++------------ c_src/py_event_loop.c | 12 ++-- c_src/py_event_loop.h | 3 +- test/py_channel_SUITE.erl | 42 ++++++++++++ 4 files changed, 143 insertions(+), 50 deletions(-) diff --git a/c_src/py_channel.c b/c_src/py_channel.c index 4371dc4..c434d38 100644 --- a/c_src/py_channel.c +++ b/c_src/py_channel.c @@ -48,9 +48,11 @@ void channel_resource_dtor(ErlNifEnv *env, void *obj) { * This is safe because enif_send just puts a message in the process mailbox. */ if (channel->has_sync_waiter) { ErlNifEnv *msg_env = enif_alloc_env(); - enif_send(NULL, &channel->sync_waiter_pid, msg_env, - enif_make_atom(msg_env, "channel_closed")); - enif_free_env(msg_env); + if (msg_env != NULL) { + enif_send(NULL, &channel->sync_waiter_pid, msg_env, + enif_make_atom(msg_env, "channel_closed")); + enif_free_env(msg_env); + } } /* Note: We do NOT notify async waiter here because the event loop @@ -142,26 +144,26 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) /* Check if there's a waiting context to resume */ bool should_resume = (channel->waiting != NULL); - /* Check if there's an async waiter to dispatch */ + /* Check if there's an async waiter to dispatch. + * We only clear the waiter state after successful dispatch to avoid + * lost wakeups if the event queue is full. */ erlang_event_loop_t *loop_to_wake = NULL; uint64_t callback_id = 0; + bool has_async_waiter = channel->has_waiter; - if (channel->has_waiter) { + if (has_async_waiter) { loop_to_wake = channel->waiter_loop; callback_id = channel->waiter_callback_id; - channel->has_waiter = false; - channel->waiter_loop = NULL; - /* Note: Keep the reference until after dispatch */ + /* Don't clear yet - will clear after successful dispatch */ } /* Check if there's a sync waiter to notify */ ErlNifPid sync_waiter; - bool notify_sync = false; + bool has_sync_waiter = channel->has_sync_waiter; - if (channel->has_sync_waiter) { + if (has_sync_waiter) { sync_waiter = channel->sync_waiter_pid; - notify_sync = true; - channel->has_sync_waiter = false; + /* Don't clear yet - will clear after successful send */ } pthread_mutex_unlock(&channel->mutex); @@ -173,17 +175,36 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size) /* Dispatch async waiter via timer dispatch (same path as timers) */ if (loop_to_wake != NULL) { - event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); - /* Release the reference we kept in channel_wait */ - enif_release_resource(loop_to_wake); + bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); + if (dispatched) { + /* Successfully dispatched - now clear the waiter state */ + pthread_mutex_lock(&channel->mutex); + if (channel->has_waiter && channel->waiter_callback_id == callback_id) { + channel->has_waiter = false; + channel->waiter_loop = NULL; + } + pthread_mutex_unlock(&channel->mutex); + /* Release the reference we kept in channel_wait */ + enif_release_resource(loop_to_wake); + } + /* If dispatch failed, waiter remains registered for next send */ } /* Notify sync waiter via Erlang message */ - if (notify_sync) { + if (has_sync_waiter) { ErlNifEnv *msg_env = enif_alloc_env(); - enif_send(NULL, &sync_waiter, msg_env, - enif_make_atom(msg_env, "channel_data_ready")); - enif_free_env(msg_env); + if (msg_env != NULL) { + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_data_ready")); + enif_free_env(msg_env); + /* Successfully notified - clear the waiter state */ + pthread_mutex_lock(&channel->mutex); + if (channel->has_sync_waiter) { + channel->has_sync_waiter = false; + } + pthread_mutex_unlock(&channel->mutex); + } + /* If alloc failed, waiter remains registered for next send */ } return 0; @@ -219,25 +240,26 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { /* Check if there's a waiting context to resume */ bool should_resume = (channel->waiting != NULL); - /* Check if there's an async waiter to dispatch */ + /* Check if there's an async waiter to dispatch. + * We only clear the waiter state after successful dispatch to avoid + * lost wakeups if the event queue is full. */ erlang_event_loop_t *loop_to_wake = NULL; uint64_t callback_id = 0; + bool has_async_waiter = channel->has_waiter; - if (channel->has_waiter) { + if (has_async_waiter) { loop_to_wake = channel->waiter_loop; callback_id = channel->waiter_callback_id; - channel->has_waiter = false; - channel->waiter_loop = NULL; + /* Don't clear yet - will clear after successful dispatch */ } /* Check if there's a sync waiter to notify */ ErlNifPid sync_waiter; - bool notify_sync = false; + bool has_sync_waiter = channel->has_sync_waiter; - if (channel->has_sync_waiter) { + if (has_sync_waiter) { sync_waiter = channel->sync_waiter_pid; - notify_sync = true; - channel->has_sync_waiter = false; + /* Don't clear yet - will clear after successful send */ } pthread_mutex_unlock(&channel->mutex); @@ -246,17 +268,38 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) { channel_resume_waiting(channel); } + /* Dispatch async waiter via timer dispatch (same path as timers) */ if (loop_to_wake != NULL) { - event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); - enif_release_resource(loop_to_wake); + bool dispatched = event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1); + if (dispatched) { + /* Successfully dispatched - now clear the waiter state */ + pthread_mutex_lock(&channel->mutex); + if (channel->has_waiter && channel->waiter_callback_id == callback_id) { + channel->has_waiter = false; + channel->waiter_loop = NULL; + } + pthread_mutex_unlock(&channel->mutex); + /* Release the reference we kept in channel_wait */ + enif_release_resource(loop_to_wake); + } + /* If dispatch failed, waiter remains registered for next send */ } /* Notify sync waiter via Erlang message */ - if (notify_sync) { + if (has_sync_waiter) { ErlNifEnv *msg_env = enif_alloc_env(); - enif_send(NULL, &sync_waiter, msg_env, - enif_make_atom(msg_env, "channel_data_ready")); - enif_free_env(msg_env); + if (msg_env != NULL) { + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_data_ready")); + enif_free_env(msg_env); + /* Successfully notified - clear the waiter state */ + pthread_mutex_lock(&channel->mutex); + if (channel->has_sync_waiter) { + channel->has_sync_waiter = false; + } + pthread_mutex_unlock(&channel->mutex); + } + /* If alloc failed, waiter remains registered for next send */ } return 0; @@ -312,7 +355,9 @@ void channel_close(py_channel_t *channel) { channel->closed = true; bool should_resume = (channel->waiting != NULL); - /* Check if there's an async waiter to dispatch */ + /* Check if there's an async waiter to dispatch. + * For close, we unconditionally clear the waiter since the channel + * is now closed - any future receive will see the closed state. */ erlang_event_loop_t *loop_to_wake = NULL; uint64_t callback_id = 0; @@ -348,9 +393,11 @@ void channel_close(py_channel_t *channel) { /* Notify sync waiter that channel is closed */ if (notify_sync) { ErlNifEnv *msg_env = enif_alloc_env(); - enif_send(NULL, &sync_waiter, msg_env, - enif_make_atom(msg_env, "channel_closed")); - enif_free_env(msg_env); + if (msg_env != NULL) { + enif_send(NULL, &sync_waiter, msg_env, + enif_make_atom(msg_env, "channel_closed")); + enif_free_env(msg_env); + } } } @@ -646,6 +693,12 @@ ERL_NIF_TERM nif_channel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[ return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed")); } + /* Reject if any waiter already exists (no mixed or duplicate waiters) */ + if (channel->has_waiter || channel->has_sync_waiter) { + pthread_mutex_unlock(&channel->mutex); + return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists")); + } + /* Check if data already available */ size_t queue_size = enif_ioq_size(channel->queue); if (queue_size > 0) { @@ -683,11 +736,6 @@ ERL_NIF_TERM nif_channel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[ /* Keep reference to event loop to prevent destruction while waiting */ enif_keep_resource(loop); - /* Clear any previous waiter (should not happen, but be safe) */ - if (channel->waiter_loop != NULL) { - enif_release_resource(channel->waiter_loop); - } - channel->waiter_loop = loop; channel->waiter_callback_id = callback_id; channel->has_waiter = true; @@ -770,8 +818,8 @@ ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ER return enif_make_atom(env, "has_data"); } - /* Check if another sync waiter is already registered */ - if (channel->has_sync_waiter) { + /* Reject if any waiter already exists (no mixed or duplicate waiters) */ + if (channel->has_sync_waiter || channel->has_waiter) { pthread_mutex_unlock(&channel->mutex); return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists")); } diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 970c1a6..66b8377 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -1927,27 +1927,28 @@ static inline void pending_hash_clear(erlang_event_loop_t *loop) { } } -void event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, +bool event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, uint64_t callback_id, int fd) { /* Backpressure: check pending count before acquiring lock (fast path) */ if (atomic_load(&loop->pending_count) >= MAX_PENDING_EVENTS) { - return; /* Queue full, drop event */ + return false; /* Queue full */ } pthread_mutex_lock(&loop->mutex); /* O(1) duplicate check using hash set */ if (pending_hash_contains(loop, callback_id, type)) { - /* Already have this event pending, skip */ + /* Already have this event pending - this counts as success since + * the event will be delivered */ pthread_mutex_unlock(&loop->mutex); - return; + return true; } /* Get event from freelist or allocate new (Phase 7 optimization) */ pending_event_t *event = get_pending_event(loop); if (event == NULL) { pthread_mutex_unlock(&loop->mutex); - return; + return false; /* Allocation failed */ } event->type = type; @@ -1977,6 +1978,7 @@ void event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, } pthread_mutex_unlock(&loop->mutex); + return true; } void event_loop_clear_pending(erlang_event_loop_t *loop) { diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 941b043..3763bc8 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -498,8 +498,9 @@ ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc, * @param type Event type (read, write, timer) * @param callback_id Callback ID to dispatch * @param fd File descriptor (for read/write), -1 for timers + * @return true if event was added, false if queue is full or allocation failed */ -void event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, +bool event_loop_add_pending(erlang_event_loop_t *loop, event_type_t type, uint64_t callback_id, int fd); /** diff --git a/test/py_channel_SUITE.erl b/test/py_channel_SUITE.erl index 2fcb864..caf06a5 100644 --- a/test/py_channel_SUITE.erl +++ b/test/py_channel_SUITE.erl @@ -37,6 +37,9 @@ sync_receive_wait_test/1, sync_receive_closed_test/1, sync_receive_multiple_waiters_test/1, + %% Mixed waiter rejection tests + mixed_waiter_sync_blocks_async_test/1, + mixed_waiter_async_blocks_sync_test/1, %% Async receive with actual waiting async_receive_wait_e2e_test/1, %% Subinterpreter mode tests @@ -67,6 +70,9 @@ all() -> [ sync_receive_wait_test, sync_receive_closed_test, sync_receive_multiple_waiters_test, + %% Mixed waiter rejection tests + mixed_waiter_sync_blocks_async_test, + mixed_waiter_async_blocks_sync_test, %% Async receive with actual waiting async_receive_wait_e2e_test, %% Subinterpreter mode tests @@ -466,6 +472,42 @@ sync_receive_multiple_waiters_test(_Config) -> ct:fail("Did not receive channel_closed") end. +%% @doc Test that sync waiter blocks async waiter registration +mixed_waiter_sync_blocks_async_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Register sync waiter first + ok = py_nif:channel_register_sync_waiter(Ch), + + %% Create an event loop for async waiter test + {ok, Loop} = py_nif:event_loop_new(), + + %% Try to register async waiter - should fail + {error, waiter_exists} = py_nif:channel_wait(Ch, 123, Loop), + + %% Clean up: send data to clear sync waiter + ok = py_channel:send(Ch, <<"data">>), + receive channel_data_ready -> ok after 100 -> ok end, + + py_nif:event_loop_destroy(Loop), + ok = py_channel:close(Ch). + +%% @doc Test that async waiter blocks sync waiter registration +mixed_waiter_async_blocks_sync_test(_Config) -> + {ok, Ch} = py_channel:new(), + + %% Create an event loop and register async waiter first + {ok, Loop} = py_nif:event_loop_new(), + ok = py_nif:channel_wait(Ch, 456, Loop), + + %% Try to register sync waiter - should fail + {error, waiter_exists} = py_nif:channel_register_sync_waiter(Ch), + + %% Clean up: cancel async waiter and close + ok = py_nif:channel_cancel_wait(Ch, 456), + py_nif:event_loop_destroy(Loop), + ok = py_channel:close(Ch). + %% @doc End-to-end test for async_receive waiting for data %% Tests that async_receive properly integrates with asyncio when data %% is sent concurrently via a background task From 47c4a5ce2269beb3bdcdeb7710e9591122186dff Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 18:39:57 +0100 Subject: [PATCH 8/9] Remove deprecated ASGI/WSGI NIF tests, keep deprecation checks The asgi_run and wsgi_run NIF functions are deprecated. Removed tests that call these functions, keeping only the deprecation attribute tests. --- test/py_web_frameworks_SUITE.erl | 127 +------------------------------ 1 file changed, 4 insertions(+), 123 deletions(-) diff --git a/test/py_web_frameworks_SUITE.erl b/test/py_web_frameworks_SUITE.erl index 7e3a69f..ceaf3fb 100644 --- a/test/py_web_frameworks_SUITE.erl +++ b/test/py_web_frameworks_SUITE.erl @@ -1,33 +1,21 @@ %%% @doc Common Test suite for deprecated ASGI/WSGI modules. %%% -%%% Tests that py_asgi and py_wsgi modules still work (backward compatibility) -%%% while being marked as deprecated. +%%% Tests that py_asgi and py_wsgi modules are marked as deprecated. -module(py_web_frameworks_SUITE). -include_lib("common_test/include/ct.hrl"). -export([ all/0, - groups/0, init_per_suite/1, end_per_suite/1, - init_per_group/2, - end_per_group/2, init_per_testcase/2, end_per_testcase/2 ]). -export([ - %% Deprecation tests test_asgi_deprecated/1, - test_wsgi_deprecated/1, - %% ASGI backward compatibility - test_asgi_run_basic/1, - test_asgi_build_scope/1, - test_asgi_scope_defaults/1, - %% WSGI backward compatibility - test_wsgi_run_basic/1, - test_wsgi_environ_defaults/1 + test_wsgi_deprecated/1 ]). %% ============================================================================ @@ -36,41 +24,16 @@ all() -> [ - {group, deprecation}, - {group, asgi_compat}, - {group, wsgi_compat} - ]. - -groups() -> - [ - {deprecation, [], [ - test_asgi_deprecated, - test_wsgi_deprecated - ]}, - {asgi_compat, [], [ - test_asgi_run_basic, - test_asgi_build_scope, - test_asgi_scope_defaults - ]}, - {wsgi_compat, [], [ - test_wsgi_run_basic, - test_wsgi_environ_defaults - ]} + test_asgi_deprecated, + test_wsgi_deprecated ]. init_per_suite(Config) -> - application:ensure_all_started(erlang_python), Config. end_per_suite(_Config) -> ok. -init_per_group(_Group, Config) -> - Config. - -end_per_group(_Group, _Config) -> - ok. - init_per_testcase(_TestCase, Config) -> Config. @@ -132,85 +95,3 @@ test_wsgi_deprecated(_Config) -> end, Deprecated), ok. - -%% ============================================================================ -%% ASGI Backward Compatibility Tests -%% ============================================================================ - -%% @doc Test basic ASGI run functionality still works. -test_asgi_run_basic(_Config) -> - %% Test that run/4 can be called (may fail due to missing app, but shouldn't crash) - Scope = #{ - type => <<"http">>, - method => <<"GET">>, - path => <<"/">> - }, - %% We expect an error because we don't have a real ASGI app, - %% but the function should be callable - Result = py_asgi:run(<<"nonexistent_module">>, <<"app">>, Scope, <<>>), - case Result of - {error, _Reason} -> ok; %% Expected - module doesn't exist - {ok, _} -> ok %% Unexpected but acceptable - end. - -%% @doc Test ASGI build_scope functionality. -test_asgi_build_scope(_Config) -> - Scope = #{ - type => <<"http">>, - method => <<"POST">>, - path => <<"/api/test">>, - query_string => <<"foo=bar">> - }, - %% build_scope should be callable - Result = py_asgi:build_scope(Scope), - case Result of - {ok, _Ref} -> ok; - {error, _Reason} -> ok %% May fail if NIF not available - end. - -%% @doc Test ASGI scope defaults are applied correctly. -test_asgi_scope_defaults(_Config) -> - %% Minimal scope - should get defaults applied - MinimalScope = #{ - path => <<"/test">> - }, - %% This tests internal ensure_scope_defaults/1 via run/4 - Result = py_asgi:run(<<"test">>, <<"app">>, MinimalScope, <<>>), - %% Should not crash, error is expected for missing module - case Result of - {error, _} -> ok; - {ok, _} -> ok - end. - -%% ============================================================================ -%% WSGI Backward Compatibility Tests -%% ============================================================================ - -%% @doc Test basic WSGI run functionality still works. -test_wsgi_run_basic(_Config) -> - Environ = #{ - <<"REQUEST_METHOD">> => <<"GET">>, - <<"PATH_INFO">> => <<"/">>, - <<"wsgi.input">> => <<>> - }, - %% We expect an error because we don't have a real WSGI app, - %% but the function should be callable - Result = py_wsgi:run(<<"nonexistent_module">>, <<"app">>, Environ), - case Result of - {error, _Reason} -> ok; %% Expected - module doesn't exist - {ok, _} -> ok %% Unexpected but acceptable - end. - -%% @doc Test WSGI environ defaults are applied correctly. -test_wsgi_environ_defaults(_Config) -> - %% Minimal environ - should get defaults applied - MinimalEnviron = #{ - <<"PATH_INFO">> => <<"/test">> - }, - %% This tests internal ensure_environ_defaults/1 via run/3 - Result = py_wsgi:run(<<"test">>, <<"app">>, MinimalEnviron), - %% Should not crash, error is expected for missing module - case Result of - {error, _} -> ok; - {ok, _} -> ok - end. From d760e0589c50c69346470f378a6630ea8f7717f1 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Tue, 10 Mar 2026 22:37:21 +0100 Subject: [PATCH 9/9] Fix use-after-free in tl_pending_args across subinterpreters Clear tl_pending_args to NULL whenever tl_pending_callback is set to false. Previously, the thread-local pointer was left dangling after callback completion. When a dirty scheduler thread later handled a different subinterpreter's code, Py_XDECREF on the stale pointer would attempt to free memory from the wrong allocator. --- c_src/py_callback.c | 29 ++++++++++++++++++++++++++--- c_src/py_exec.c | 2 ++ c_src/py_nif.c | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 2cd8b4e..ec8a152 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -563,6 +563,7 @@ static PyObject *build_pending_callback_exc_args(void) { PyObject *exc_args = PyTuple_New(3); if (exc_args == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); return NULL; } @@ -575,6 +576,7 @@ static PyObject *build_pending_callback_exc_args(void) { Py_XDECREF(func_name_obj); Py_DECREF(exc_args); tl_pending_callback = false; + Py_CLEAR(tl_pending_args); return NULL; } @@ -610,6 +612,7 @@ static ERL_NIF_TERM build_suspended_result(ErlNifEnv *env, suspended_state_t *su ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args); tl_pending_callback = false; + Py_CLEAR(tl_pending_args); return enif_make_tuple4(env, ATOM_SUSPENDED, @@ -811,6 +814,7 @@ static ERL_NIF_TERM build_suspended_context_result(ErlNifEnv *env, suspended_con ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args); tl_pending_callback = false; + Py_CLEAR(tl_pending_args); return enif_make_tuple4(env, ATOM_SUSPENDED, @@ -1290,6 +1294,19 @@ PyTypeObject ErlangPidType = { static PyObject *erlang_call_impl(PyObject *self, PyObject *args) { (void)self; + /* + * Invariant check: pending callback TLS must be clear when entering. + * If any state is still set, it's leaked from a prior context that didn't + * properly clean up - fail loudly rather than risk cross-interpreter corruption. + */ + if (tl_pending_callback || tl_pending_args != NULL || + tl_pending_func_name != NULL || tl_pending_callback_id != 0) { + PyErr_SetString(PyExc_RuntimeError, + "erlang.call: stale pending callback TLS detected - " + "prior context did not clean up properly"); + return NULL; + } + /* * Check if we have a callback handler available. * Priority: @@ -1553,6 +1570,7 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) { tl_pending_func_name = enif_alloc(func_name_len + 1); if (tl_pending_func_name == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); Py_DECREF(call_args); PyErr_SetString(PyExc_MemoryError, "Failed to allocate function name"); return NULL; @@ -1561,9 +1579,12 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) { tl_pending_func_name[func_name_len] = '\0'; tl_pending_func_name_len = func_name_len; - /* Store args (take ownership) */ - Py_XDECREF(tl_pending_args); - tl_pending_args = call_args; /* Takes ownership, don't decref */ + /* Store args (take ownership) + * Use Py_XSETREF for swap-first pattern: sets tl_pending_args to new value + * BEFORE decref'ing old value. This prevents re-entrancy issues if the old + * object's finalizer triggers another erlang.call() during decref. + */ + Py_XSETREF(tl_pending_args, call_args); /* Raise exception to abort Python execution */ PyErr_SetString(SuspensionRequiredException, "callback pending"); @@ -2649,6 +2670,7 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER Py_DECREF(exc_args); if (new_suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_nested_suspended_state_failed"); } else { result = build_suspended_result(env, new_suspended); @@ -2717,6 +2739,7 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER Py_DECREF(exc_args); if (new_suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_nested_suspended_state_failed"); } else { result = build_suspended_result(env, new_suspended); diff --git a/c_src/py_exec.c b/c_src/py_exec.c index 2fb2820..4b478b0 100644 --- a/c_src/py_exec.c +++ b/c_src/py_exec.c @@ -306,6 +306,7 @@ static void process_request(py_request_t *req) { Py_DECREF(exc_args); if (suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); req->result = make_error(env, "create_suspended_state_failed"); } else { req->result = build_suspended_result(env, suspended); @@ -393,6 +394,7 @@ static void process_request(py_request_t *req) { Py_DECREF(exc_args); if (suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); req->result = make_error(env, "create_suspended_state_failed"); } else { req->result = build_suspended_result(env, suspended); diff --git a/c_src/py_nif.c b/c_src/py_nif.c index cdbe9db..7fba13f 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -116,6 +116,24 @@ __thread char *tl_pending_func_name = NULL; __thread size_t tl_pending_func_name_len = 0; __thread PyObject *tl_pending_args = NULL; +/** + * Clear all pending callback thread-local state. + * + * Must be called at context boundaries while still in the correct interpreter + * context, to prevent cross-interpreter contamination if Python code caught + * and swallowed SuspensionRequiredException. + */ +static inline void clear_pending_callback_tls(void) { + tl_pending_callback = false; + tl_pending_callback_id = 0; + if (tl_pending_func_name != NULL) { + enif_free(tl_pending_func_name); + tl_pending_func_name = NULL; + } + tl_pending_func_name_len = 0; + Py_CLEAR(tl_pending_args); +} + /* Thread-local timeout state */ __thread uint64_t tl_timeout_deadline = 0; __thread bool tl_timeout_enabled = false; @@ -2280,6 +2298,7 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER if (suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_suspended_state_failed"); } else { result = build_suspended_context_result(env, suspended); @@ -2298,6 +2317,9 @@ static ERL_NIF_TERM nif_context_call(ErlNifEnv *env, int argc, const ERL_NIF_TER tl_allow_suspension = prev_allow_suspension; tl_current_context = prev_context; + /* Clear pending callback TLS before releasing context */ + clear_pending_callback_tls(); + enif_free(module_name); enif_free(func_name); @@ -2382,6 +2404,7 @@ static ERL_NIF_TERM nif_context_eval(ErlNifEnv *env, int argc, const ERL_NIF_TER if (suspended == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_suspended_state_failed"); } else { result = build_suspended_context_result(env, suspended); @@ -2399,6 +2422,9 @@ static ERL_NIF_TERM nif_context_eval(ErlNifEnv *env, int argc, const ERL_NIF_TER tl_allow_suspension = prev_allow_suspension; tl_current_context = prev_context; + /* Clear pending callback TLS before releasing context */ + clear_pending_callback_tls(); + enif_free(code); /* Release thread state using centralized guard */ @@ -2867,12 +2893,14 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T if (nested == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_nested_suspended_state_failed"); } else { /* Copy accumulated callback results from parent to nested state */ if (copy_callback_results_to_nested(nested, state) != 0) { enif_release_resource(nested); tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "copy_callback_results_failed"); } else { result = build_suspended_context_result(env, nested); @@ -2921,12 +2949,14 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T if (nested == NULL) { tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "create_nested_suspended_state_failed"); } else { /* Copy accumulated callback results from parent to nested state */ if (copy_callback_results_to_nested(nested, state) != 0) { enif_release_resource(nested); tl_pending_callback = false; + Py_CLEAR(tl_pending_args); result = make_error(env, "copy_callback_results_failed"); } else { result = build_suspended_context_result(env, nested); @@ -2951,6 +2981,9 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T tl_allow_suspension = prev_allow_suspension; tl_current_context = prev_context; + /* Clear pending callback TLS before releasing context */ + clear_pending_callback_tls(); + /* Release thread state using centralized guard */ py_context_release(&guard);