From 71f44f156fca5ba3b4d5fc9e41ba5256d13f91e3 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 11:11:05 +0000 Subject: [PATCH 1/6] #162: document phpredis async pool connect() host fix and mux limitation --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c030a6..0438846 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **phpredis async pool ignored the host given to `connect()` — `failed to acquire connection` outside loopback (e.g. Docker)** — the pool factory hardcoded `127.0.0.1:6379` and applied only the constructor options, so a host supplied via a later `$redis->connect('redis', 6379)` was never used; every pooled connection dialed loopback and failed wherever Redis lives on a service hostname. The factory now seeds each connection (host, port, timeouts, auth, selected DB, TLS context, keepalive) from the owner's template socket; constructor options still override. Fixed in phpredis `redis_pool.c`; regression test `tests/async/008-pool_connect_host.phpt`. + - **Limitation — with `mux > 0` the host MUST be set in the constructor, not via `connect()`.** Multiplex lanes are opened eagerly inside the `new Redis([...])` constructor, before `connect()` runs, so there is no template to seed from yet and the lane falls back to loopback. Use `new Redis(['host' => 'redis', 'pool' => ['mux' => 2, ...]])`. The checkout pool (`mux = 0`) creates connections lazily after `connect()`, so the `connect()` host works there. + ## [0.7.3] - 2026-06-25 ### Fixed From e87b4f0afe95b16e48ca7445b45b7a8e866769e0 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 11:28:37 +0000 Subject: [PATCH 2/6] =?UTF-8?q?#162:=20phpredis=20pool=20=E2=80=94=20conne?= =?UTF-8?q?ct()=20rejected=20in=20pool=20mode=20(revise=20note)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0438846..a7c699c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Fixed -- **phpredis async pool ignored the host given to `connect()` — `failed to acquire connection` outside loopback (e.g. Docker)** — the pool factory hardcoded `127.0.0.1:6379` and applied only the constructor options, so a host supplied via a later `$redis->connect('redis', 6379)` was never used; every pooled connection dialed loopback and failed wherever Redis lives on a service hostname. The factory now seeds each connection (host, port, timeouts, auth, selected DB, TLS context, keepalive) from the owner's template socket; constructor options still override. Fixed in phpredis `redis_pool.c`; regression test `tests/async/008-pool_connect_host.phpt`. - - **Limitation — with `mux > 0` the host MUST be set in the constructor, not via `connect()`.** Multiplex lanes are opened eagerly inside the `new Redis([...])` constructor, before `connect()` runs, so there is no template to seed from yet and the lane falls back to loopback. Use `new Redis(['host' => 'redis', 'pool' => ['mux' => 2, ...]])`. The checkout pool (`mux = 0`) creates connections lazily after `connect()`, so the `connect()` host works there. +### Changed +- **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`. ## [0.7.3] - 2026-06-25 From 71edb27ff4204788b319f6679ccbb80f19355962 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:10:48 +0000 Subject: [PATCH 3/6] #162: fix ThreadChannel recv() hang at shutdown when the owner finishes without close() A worker spawned with Async\spawn_thread() and parked on ThreadChannel::recv() kept the whole process alive once the owning side finished without close(): the non-awaited worker pinned the parent scheduler, so the parent never reached request shutdown to release the channel, and recv() blocked forever with no output and no diagnostic. 1. Transparent thread completion event (libuv_reactor.c/.h). The event is hidden and its notify handle is unref'd by default, so a non-awaited worker no longer keeps the parent loop alive. start() creates the OS thread on the first call (ASYNC_THREAD_F_LAUNCHED) and stays transparent; a later start() from an awaiter arms the wait (uv_ref + count). stop() disarms back to transparent. CLOSED is set on completion in notify_cb after stop(), because the stop prologue short-circuits on a closed event and would skip the disarm. An awaited worker is kept alive by its suspended coroutine. 2. Endpoint-drop disconnect for PHP ThreadChannels (thread_channel.c/.h). A channel left with a single wrapper (ref_count <= 1) has no other thread that could be a peer, so parked recv()/send() wake with ThreadChannelException ("no producers/consumers remain"), checked both at park (last peer dropped before we parked) and on dispose (peer drops while parked). Raw pool channels keep auto_disconnect off and close() explicitly. Tests: thread_channel/042; full thread/thread_channel/thread_pool suites green. Claude-Session: https://claude.ai/code/session_01NMdPEuD85qzTjv2N4ihTQU --- CHANGELOG.md | 3 + libuv_reactor.c | 73 ++++++++++------ libuv_reactor.h | 4 + .../042-recv_disconnect_on_owner_finish.phpt | 33 +++++++ thread_channel.c | 86 ++++++++++++++++++- thread_channel.h | 22 ++++- 6 files changed, 191 insertions(+), 30 deletions(-) create mode 100644 tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt diff --git a/CHANGELOG.md b/CHANGELOG.md index a7c699c..135ea97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached request shutdown to release the channel, and the worker blocked on `recv()` forever (no output, no diagnostic). The thread completion event is now transparent — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so the parent finishes normally and drops its channel reference. A PHP-constructed `ThreadChannel` left with a single endpoint (`ref_count <= 1`, i.e. no other thread can be a peer) then wakes parked `recv()`/`send()` with `Async\ThreadChannelException` ("no producers/consumers remain"), mirroring Rust's `std::sync::mpsc` disconnect-on-drop. Awaited threads still block the parent as before; the thread pool's internal channels are unaffected (they `close()` explicitly). + ### Changed - **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`. diff --git a/libuv_reactor.c b/libuv_reactor.c index 05a3b61..dd78544 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -2161,8 +2161,12 @@ static void libuv_thread_notify_cb(uv_async_t *handle) } } + /* Thread finished: notify waiters, disarm the wait, then mark completed. + * CLOSED must be set AFTER stop() — the stop prologue short-circuits on a + * closed event and would otherwise skip the disarm (unref + uncount). */ ZEND_ASYNC_CALLBACKS_NOTIFY(&thread->event.base, &thread->event.result, thread->event.exception); thread->event.base.stop(&thread->event.base); + ZEND_ASYNC_EVENT_SET_CLOSED(&thread->event.base); if (ZEND_ASYNC_EVENT_IS_EXCEPTION_HANDLED(&thread->event.base)) { ZEND_THREAD_SET_EXCEPTION_CONSUMED(&thread->event); @@ -2189,37 +2193,46 @@ static bool libuv_thread_event_start(zend_async_event_t *event) async_thread_event_t *thread = (async_thread_event_t *) event; - /* Add ref on context for the thread runner */ - if (thread->event.context) { - zend_atomic_ptr_store(&thread->event.context->event, &thread->event); - ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context); - } - - /* Initialise registry and assign+register the context's key BEFORE - * uv_thread_create. Doing it after creates a race: a fast-exiting - * runner reads context->key == 0, skips self-removal, then we add - * the key and the entry leaks — quiesce hangs forever. */ - libuv_thread_registry_init(); + /* First start() is the spawn call: create the OS thread and stay transparent + * (no ref/count). loop_ref_count stays 0 so the first awaiter's start() arms. */ + if ((event->flags & ASYNC_THREAD_F_LAUNCHED) == 0) { - if (thread->event.context) { - thread->event.context->key = - (zend_async_thread_handle_t) async_ptr_to_index(thread->event.context); - libuv_thread_registry_add(thread->event.context->key); - } + if (thread->event.context) { + zend_atomic_ptr_store(&thread->event.context->event, &thread->event); + ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context); + } - const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context); + /* Assign+register the key BEFORE uv_thread_create: a fast-exiting runner + * that reads key == 0 skips self-removal and the registry entry leaks. */ + libuv_thread_registry_init(); - if (UNEXPECTED(ret != 0)) { if (thread->event.context) { - libuv_thread_registry_remove(thread->event.context->key); - thread->event.context->key = 0; - zend_atomic_int_dec(&thread->event.context->ref_count); + thread->event.context->key = + (zend_async_thread_handle_t) async_ptr_to_index(thread->event.context); + libuv_thread_registry_add(thread->event.context->key); } - async_throw_error("Failed to create thread: %s", uv_strerror(ret)); - return false; + const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context); + + if (UNEXPECTED(ret != 0)) { + if (thread->event.context) { + libuv_thread_registry_remove(thread->event.context->key); + thread->event.context->key = 0; + zend_atomic_int_dec(&thread->event.context->ref_count); + } + + async_throw_error("Failed to create thread: %s", uv_strerror(ret)); + return false; + } + + event->flags |= ASYNC_THREAD_F_LAUNCHED; + return true; } + /* Subsequent start() is an awaiter: arm the wait so the parent loop blocks + * for completion (ref the notify handle and count the event). */ + uv_ref((uv_handle_t *) &thread->uv_notify); + ZEND_ASYNC_EVENT_CLR_HIDDEN(event); event->loop_ref_count++; ZEND_ASYNC_INCREASE_EVENT_COUNT(event); return true; @@ -2234,13 +2247,13 @@ static bool libuv_thread_event_stop(zend_async_event_t *event) async_thread_event_t *thread = (async_thread_event_t *) event; - /* Unref async handle so it doesn't keep the event loop alive. - * Actual uv_close happens in dispose when refcount reaches 0. */ + /* Last awaiter left: disarm back to transparent. Decrement before hiding + * (the count macro is a no-op on hidden events). CLOSED is set on actual + * completion in libuv_thread_notify_cb, not here. */ uv_unref((uv_handle_t *) &thread->uv_notify); - - ZEND_ASYNC_EVENT_SET_CLOSED(event); event->loop_ref_count = 0; ZEND_ASYNC_DECREASE_EVENT_COUNT(event); + ZEND_ASYNC_EVENT_SET_HIDDEN(event); return true; } @@ -2443,6 +2456,12 @@ zend_async_thread_event_t *libuv_new_thread_event( thread_event->uv_notify.data = thread_event; + /* Transparent by default: notify stays armed but unref'd (does not keep the + * parent loop alive) and the event is hidden (not counted). start() arms the + * wait (ref + count) only when someone awaits. */ + uv_unref((uv_handle_t *) &thread_event->uv_notify); + ZEND_ASYNC_EVENT_SET_HIDDEN(&thread_event->event.base); + /* Allocate last: every early-failure path above pefree's ctx directly, * so keeping the mutex out of those paths avoids a leak. No-op under NTS. */ ZEND_ASYNC_THREAD_CONTEXT_EVENT_MUTEX_ALLOC(ctx); diff --git a/libuv_reactor.h b/libuv_reactor.h index 6bece13..1403d48 100644 --- a/libuv_reactor.h +++ b/libuv_reactor.h @@ -103,6 +103,10 @@ struct _async_thread_event_t uv_async_t uv_notify; /* Cross-thread notification handle */ }; +/* Thread event is launched (OS thread created). Shares event.base.flags; + * thread bits 13/14 are taken, so this uses 15. */ +#define ASYNC_THREAD_F_LAUNCHED (1u << 15) + struct _async_exec_event_t { zend_async_exec_event_t event; diff --git a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt new file mode 100644 index 0000000..a07b031 --- /dev/null +++ b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt @@ -0,0 +1,33 @@ +--TEST-- +ThreadChannel: recv() disconnects instead of hanging when the owner finishes without close() (#162) +--SKIPIF-- + +--FILE-- +recv(); + echo "worker: got a value\n"; + } catch (ThreadChannelException $e) { + echo "worker: " . $e->getMessage() . "\n"; + } +}); + +echo "main: end (nothing sent, channel not closed)\n"; +?> +--EXPECT-- +main: end (nothing sent, channel not closed) +worker: ThreadChannel deadlock: no producers remain to send diff --git a/thread_channel.c b/thread_channel.c index 837a9bd..49f547c 100644 --- a/thread_channel.c +++ b/thread_channel.c @@ -62,6 +62,19 @@ static void fire_all_triggers(HashTable *triggers) } ZEND_HASH_FOREACH_END(); } +/* Message for a recv/send woken on a closed channel, by close reason. */ +static const char *thread_channel_close_message(uint8_t reason) +{ + switch (reason) { + case ASYNC_THREAD_CHANNEL_NO_PRODUCERS: + return "ThreadChannel deadlock: no producers remain to send"; + case ASYNC_THREAD_CHANNEL_NO_CONSUMERS: + return "ThreadChannel deadlock: no consumers remain to receive"; + default: + return "ThreadChannel is closed"; + } +} + /////////////////////////////////////////////////////////////////////////////// // C-level send/receive (coroutine-aware) /////////////////////////////////////////////////////////////////////////////// @@ -83,12 +96,13 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) /* Check closed under lock */ if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event))) { + const uint8_t reason = ch->close_reason; ASYNC_MUTEX_UNLOCK(ch->mutex); async_thread_release_transferred_zval(&persistent_copy); if (trigger != NULL) { trigger->base.dispose(&trigger->base); } - zend_throw_exception(async_ce_thread_channel_exception, "ThreadChannel is closed", 0); + zend_throw_exception(async_ce_thread_channel_exception, thread_channel_close_message(reason), 0); return false; } @@ -107,6 +121,25 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) if (trigger == NULL) { trigger = ZEND_ASYNC_NEW_TRIGGER_EVENT(); } + + /* Track parked senders so the dispose path can detect "no consumer left". */ + ch->parked_senders++; + + /* Sole wrapper left: no other thread can ever receive. Closes the race where + * the last peer dropped before we parked. */ + if (ch->auto_disconnect && zend_atomic_int_load(&ch->ref_count) <= 1) { + ch->parked_senders--; + ch->close_reason = ASYNC_THREAD_CHANNEL_NO_CONSUMERS; + ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); + fire_all_triggers(&ch->sender_triggers); + ASYNC_MUTEX_UNLOCK(ch->mutex); + async_thread_release_transferred_zval(&persistent_copy); + trigger->base.dispose(&trigger->base); + zend_throw_exception(async_ce_thread_channel_exception, + thread_channel_close_message(ASYNC_THREAD_CHANNEL_NO_CONSUMERS), 0); + return false; + } + zend_hash_index_update_ptr(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger, trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -124,6 +157,7 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) if (UNEXPECTED(channel_bailed)) { ASYNC_MUTEX_LOCK(ch->mutex); + ch->parked_senders--; zend_hash_index_del(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); @@ -136,6 +170,7 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) /* Woke up — remove from sender queue */ ASYNC_MUTEX_LOCK(ch->mutex); + ch->parked_senders--; zend_hash_index_del(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -176,13 +211,14 @@ static bool thread_channel_receive( /* Buffer empty (or wait_only) — check if closed */ if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event))) { + const uint8_t reason = ch->close_reason; ASYNC_MUTEX_UNLOCK(ch->mutex); if (trigger != NULL) { trigger->base.dispose(&trigger->base); } /* wait_only callers expect a quiet false on close. */ if (!wait_only) { - zend_throw_exception(async_ce_thread_channel_exception, "ThreadChannel is closed", 0); + zend_throw_exception(async_ce_thread_channel_exception, thread_channel_close_message(reason), 0); } return false; } @@ -192,6 +228,27 @@ static bool thread_channel_receive( if (trigger == NULL) { trigger = ZEND_ASYNC_NEW_TRIGGER_EVENT(); } + + /* Track parked receivers so the dispose path can detect "no producer left". */ + ch->parked_receivers++; + + /* Sole wrapper left: no other thread can ever send. Closes the race where + * the last peer dropped before we parked. (ref_count counts wrappers, i.e. + * threads; > 1 means a peer endpoint still exists somewhere.) */ + if (ch->auto_disconnect && zend_atomic_int_load(&ch->ref_count) <= 1) { + ch->parked_receivers--; + ch->close_reason = ASYNC_THREAD_CHANNEL_NO_PRODUCERS; + ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); + fire_all_triggers(&ch->receiver_triggers); + ASYNC_MUTEX_UNLOCK(ch->mutex); + trigger->base.dispose(&trigger->base); + if (!wait_only) { + zend_throw_exception(async_ce_thread_channel_exception, + thread_channel_close_message(ASYNC_THREAD_CHANNEL_NO_PRODUCERS), 0); + } + return false; + } + zend_hash_index_update_ptr(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger, trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -213,6 +270,7 @@ static bool thread_channel_receive( if (UNEXPECTED(channel_bailed)) { ASYNC_MUTEX_LOCK(ch->mutex); + ch->parked_receivers--; zend_hash_index_del(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); @@ -224,6 +282,7 @@ static bool thread_channel_receive( /* Woke up — remove from receiver queue, observe closed state */ ASYNC_MUTEX_LOCK(ch->mutex); + ch->parked_receivers--; zend_hash_index_del(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger); const bool closed = ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -354,8 +413,30 @@ static bool thread_channel_event_dispose(zend_async_event_t *event) if (old == 1) { thread_channel_destroy(ch); + return true; } + /* An endpoint is gone. If every endpoint left is a parked waiter, no peer + * can ever make progress, so close the channel: parked recv()/send() then + * surface ThreadChannelException with the matching reason. Endpoints here + * are wrapper refs (ref_count); raw-pointer holders such as the thread pool + * keep ref_count == 1 and only ever drop it to 0 (handled above). */ + ASYNC_MUTEX_LOCK(ch->mutex); + if (ch->auto_disconnect && !ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) { + const int endpoints = old - 1; + const bool no_producers = ch->parked_receivers > 0 && endpoints <= 1; + const bool no_consumers = ch->parked_senders > 0 && endpoints <= 1; + + if (no_producers || no_consumers) { + ch->close_reason = no_producers ? ASYNC_THREAD_CHANNEL_NO_PRODUCERS + : ASYNC_THREAD_CHANNEL_NO_CONSUMERS; + ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); + fire_all_triggers(&ch->receiver_triggers); + fire_all_triggers(&ch->sender_triggers); + } + } + ASYNC_MUTEX_UNLOCK(ch->mutex); + return true; } @@ -460,6 +541,7 @@ METHOD(__construct) thread_channel_object_t *obj = ASYNC_THREAD_CHANNEL_FROM_OBJ(Z_OBJ_P(ZEND_THIS)); obj->channel = async_thread_channel_create((int32_t) capacity); + obj->channel->auto_disconnect = true; } METHOD(send) diff --git a/thread_channel.h b/thread_channel.h index faf794a..4a84940 100644 --- a/thread_channel.h +++ b/thread_channel.h @@ -49,10 +49,30 @@ struct _async_thread_channel_s { HashTable receiver_triggers; /* triggers from wrappers waiting to receive */ HashTable sender_triggers; /* triggers from wrappers waiting to send */ - /* Reference count for cross-thread sharing */ + /* Reference count for cross-thread sharing. For PHP-constructed channels it + * also counts live endpoints (one wrapper ref each), which auto_disconnect + * relies on. Raw C holders (e.g. the thread pool) keep ref_count at 1. */ zend_atomic_int ref_count; + + /* Coroutines currently suspended inside recv()/send() (mutex-guarded). + * Used to detect "no producer/consumer left" disconnect. */ + int32_t parked_receivers; + int32_t parked_senders; + + /* Why the channel closed; read by a woken recv/send to pick the message. */ + uint8_t close_reason; + + /* Auto-disconnect parked waiters when no peer endpoint remains. Set only for + * PHP-constructed channels, where ref_count is a reliable endpoint count. + * Off for raw pool channels, which manage close() explicitly. */ + bool auto_disconnect; }; +/* close_reason values */ +#define ASYNC_THREAD_CHANNEL_CLOSED 0 +#define ASYNC_THREAD_CHANNEL_NO_PRODUCERS 1 +#define ASYNC_THREAD_CHANNEL_NO_CONSUMERS 2 + /////////////////////////////////////////////////////////// /// PHP object wrapper (emalloc, per-thread) /////////////////////////////////////////////////////////// From 07823337fb6ff1cb91c771749301bca488fc5072 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:15:40 +0000 Subject: [PATCH 4/6] #162: disconnect parked ThreadChannel workers at shutdown via a channel registry Replaces the ref_count-based disconnect from the previous commit, which was wrong for fan-out (N>1 workers on one channel) and the thread pool. ref_count counts wrappers, not endpoints, so no per-channel condition is reliable. Instead: every live shared channel is tracked in a process-wide registry, and libuv_reactor_quiesce() closes them all before waiting for child threads. A worker parked on recv()/send() then wakes with ThreadChannelException and exits. Combined with the transparent thread event (parent reaches shutdown instead of pinning on a non-awaited worker), this fixes the hang for any number of workers. Tests: thread_channel/042 (recv), 043 (producer coroutine), 044 (fan-out). Claude-Session: https://claude.ai/code/session_01NMdPEuD85qzTjv2N4ihTQU --- CHANGELOG.md | 2 +- libuv_reactor.c | 10 +- .../042-recv_disconnect_on_owner_finish.phpt | 2 +- ...43-recv_disconnect_producer_coroutine.phpt | 38 +++++ .../044-recv_disconnect_fanout.phpt | 31 ++++ thread_channel.c | 141 ++++++++---------- thread_channel.h | 28 +--- 7 files changed, 146 insertions(+), 106 deletions(-) create mode 100644 tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt create mode 100644 tests/thread_channel/044-recv_disconnect_fanout.phpt diff --git a/CHANGELOG.md b/CHANGELOG.md index 135ea97..3eaf430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed -- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached request shutdown to release the channel, and the worker blocked on `recv()` forever (no output, no diagnostic). The thread completion event is now transparent — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so the parent finishes normally and drops its channel reference. A PHP-constructed `ThreadChannel` left with a single endpoint (`ref_count <= 1`, i.e. no other thread can be a peer) then wakes parked `recv()`/`send()` with `Async\ThreadChannelException` ("no producers/consumers remain"), mirroring Rust's `std::sync::mpsc` disconnect-on-drop. Awaited threads still block the parent as before; the thread pool's internal channels are unaffected (they `close()` explicitly). +- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) every live `ThreadChannel` is now tracked in a process-wide registry and **closed at shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers). ### Changed - **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`. diff --git a/libuv_reactor.c b/libuv_reactor.c index dd78544..a719806 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -21,6 +21,7 @@ #include "exceptions.h" #include "php_async.h" +#include "thread_channel.h" #include "php_main.h" #include "thread.h" #include "thread_pool.h" @@ -166,6 +167,10 @@ static void libuv_reactor_quiesce(void) return; } + /* Wake any worker parked on a thread channel so it can exit; otherwise the + * wait below would hang on a non-awaited worker whose owner finished. */ + async_thread_channel_close_all(); + uv_mutex_lock(&child_thread_registry_mutex); while (zend_hash_num_elements(&child_thread_registry) > 0) { uv_cond_wait(&child_thread_registry_cond, &child_thread_registry_mutex); @@ -2161,9 +2166,8 @@ static void libuv_thread_notify_cb(uv_async_t *handle) } } - /* Thread finished: notify waiters, disarm the wait, then mark completed. - * CLOSED must be set AFTER stop() — the stop prologue short-circuits on a - * closed event and would otherwise skip the disarm (unref + uncount). */ + /* Set CLOSED AFTER stop(): the stop prologue short-circuits on a closed + * event and would skip the disarm (unref + uncount). */ ZEND_ASYNC_CALLBACKS_NOTIFY(&thread->event.base, &thread->event.result, thread->event.exception); thread->event.base.stop(&thread->event.base); ZEND_ASYNC_EVENT_SET_CLOSED(&thread->event.base); diff --git a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt index a07b031..a9b9b60 100644 --- a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt +++ b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt @@ -30,4 +30,4 @@ echo "main: end (nothing sent, channel not closed)\n"; ?> --EXPECT-- main: end (nothing sent, channel not closed) -worker: ThreadChannel deadlock: no producers remain to send +worker: ThreadChannel is closed diff --git a/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt new file mode 100644 index 0000000..5b08853 --- /dev/null +++ b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt @@ -0,0 +1,38 @@ +--TEST-- +ThreadChannel: recv() disconnects when the producer coroutine finishes without close() (#162) +--SKIPIF-- + +--FILE-- +recv(); + echo "worker: got a value\n"; + } catch (ThreadChannelException $e) { + echo "worker: " . $e->getMessage() . "\n"; + } + }); + echo "producer: done (no send, no close)\n"; +}); + +echo "main: done\n"; +?> +--EXPECT-- +main: done +producer: done (no send, no close) +worker: ThreadChannel is closed diff --git a/tests/thread_channel/044-recv_disconnect_fanout.phpt b/tests/thread_channel/044-recv_disconnect_fanout.phpt new file mode 100644 index 0000000..6d94ee9 --- /dev/null +++ b/tests/thread_channel/044-recv_disconnect_fanout.phpt @@ -0,0 +1,31 @@ +--TEST-- +ThreadChannel: multiple workers parked on recv() all disconnect when the owner finishes (#162 fan-out) +--SKIPIF-- + +--FILE-- +recv(); echo "w$i: got value\n"; } + catch (ThreadChannelException $e) { echo "w$i: disconnected\n"; } + }); +} + +echo "main: end\n"; +?> +--EXPECTF-- +main: end +w%d: disconnected +w%d: disconnected diff --git a/thread_channel.c b/thread_channel.c index 49f547c..9892c8a 100644 --- a/thread_channel.c +++ b/thread_channel.c @@ -62,17 +62,61 @@ static void fire_all_triggers(HashTable *triggers) } ZEND_HASH_FOREACH_END(); } -/* Message for a recv/send woken on a closed channel, by close reason. */ -static const char *thread_channel_close_message(uint8_t reason) +/////////////////////////////////////////////////////////////////////////////// +// Process-wide registry of live channels +/////////////////////////////////////////////////////////////////////////////// + +/* Every live shared channel is registered here. At shutdown async_thread_channel + * _close_all() closes them so workers parked on recv()/send() wake and exit + * instead of hanging the process (a non-awaited owner can finish without close). + * Lock order is always registry mutex -> channel mutex. */ +#ifdef ZTS +static MUTEX_T thread_channel_registry_mutex = NULL; +#endif +static HashTable thread_channel_registry; +static bool thread_channel_registry_inited = false; + +void async_thread_channel_registry_init(void) { - switch (reason) { - case ASYNC_THREAD_CHANNEL_NO_PRODUCERS: - return "ThreadChannel deadlock: no producers remain to send"; - case ASYNC_THREAD_CHANNEL_NO_CONSUMERS: - return "ThreadChannel deadlock: no consumers remain to receive"; - default: - return "ThreadChannel is closed"; + if (thread_channel_registry_inited) { + return; } + zend_hash_init(&thread_channel_registry, 8, NULL, NULL, 1); + ASYNC_MUTEX_INIT(thread_channel_registry_mutex); + thread_channel_registry_inited = true; +} + +static void thread_channel_registry_add(async_thread_channel_t *ch) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + zend_hash_index_add_ptr(&thread_channel_registry, (zend_ulong)(uintptr_t) ch, ch); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); +} + +static void thread_channel_registry_remove(async_thread_channel_t *ch) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + zend_hash_index_del(&thread_channel_registry, (zend_ulong)(uintptr_t) ch); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); +} + +void async_thread_channel_close_all(void) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + async_thread_channel_t *ch; + ZEND_HASH_FOREACH_PTR(&thread_channel_registry, ch) { + async_thread_channel_close(ch); + } ZEND_HASH_FOREACH_END(); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); } /////////////////////////////////////////////////////////////////////////////// @@ -96,13 +140,12 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) /* Check closed under lock */ if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event))) { - const uint8_t reason = ch->close_reason; ASYNC_MUTEX_UNLOCK(ch->mutex); async_thread_release_transferred_zval(&persistent_copy); if (trigger != NULL) { trigger->base.dispose(&trigger->base); } - zend_throw_exception(async_ce_thread_channel_exception, thread_channel_close_message(reason), 0); + zend_throw_exception(async_ce_thread_channel_exception, "ThreadChannel is closed", 0); return false; } @@ -121,25 +164,6 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) if (trigger == NULL) { trigger = ZEND_ASYNC_NEW_TRIGGER_EVENT(); } - - /* Track parked senders so the dispose path can detect "no consumer left". */ - ch->parked_senders++; - - /* Sole wrapper left: no other thread can ever receive. Closes the race where - * the last peer dropped before we parked. */ - if (ch->auto_disconnect && zend_atomic_int_load(&ch->ref_count) <= 1) { - ch->parked_senders--; - ch->close_reason = ASYNC_THREAD_CHANNEL_NO_CONSUMERS; - ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); - fire_all_triggers(&ch->sender_triggers); - ASYNC_MUTEX_UNLOCK(ch->mutex); - async_thread_release_transferred_zval(&persistent_copy); - trigger->base.dispose(&trigger->base); - zend_throw_exception(async_ce_thread_channel_exception, - thread_channel_close_message(ASYNC_THREAD_CHANNEL_NO_CONSUMERS), 0); - return false; - } - zend_hash_index_update_ptr(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger, trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -157,7 +181,6 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) if (UNEXPECTED(channel_bailed)) { ASYNC_MUTEX_LOCK(ch->mutex); - ch->parked_senders--; zend_hash_index_del(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); @@ -170,7 +193,6 @@ static bool thread_channel_send(zend_async_channel_t *channel, zval *value) /* Woke up — remove from sender queue */ ASYNC_MUTEX_LOCK(ch->mutex); - ch->parked_senders--; zend_hash_index_del(&ch->sender_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -211,14 +233,13 @@ static bool thread_channel_receive( /* Buffer empty (or wait_only) — check if closed */ if (UNEXPECTED(ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event))) { - const uint8_t reason = ch->close_reason; ASYNC_MUTEX_UNLOCK(ch->mutex); if (trigger != NULL) { trigger->base.dispose(&trigger->base); } /* wait_only callers expect a quiet false on close. */ if (!wait_only) { - zend_throw_exception(async_ce_thread_channel_exception, thread_channel_close_message(reason), 0); + zend_throw_exception(async_ce_thread_channel_exception, "ThreadChannel is closed", 0); } return false; } @@ -228,27 +249,6 @@ static bool thread_channel_receive( if (trigger == NULL) { trigger = ZEND_ASYNC_NEW_TRIGGER_EVENT(); } - - /* Track parked receivers so the dispose path can detect "no producer left". */ - ch->parked_receivers++; - - /* Sole wrapper left: no other thread can ever send. Closes the race where - * the last peer dropped before we parked. (ref_count counts wrappers, i.e. - * threads; > 1 means a peer endpoint still exists somewhere.) */ - if (ch->auto_disconnect && zend_atomic_int_load(&ch->ref_count) <= 1) { - ch->parked_receivers--; - ch->close_reason = ASYNC_THREAD_CHANNEL_NO_PRODUCERS; - ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); - fire_all_triggers(&ch->receiver_triggers); - ASYNC_MUTEX_UNLOCK(ch->mutex); - trigger->base.dispose(&trigger->base); - if (!wait_only) { - zend_throw_exception(async_ce_thread_channel_exception, - thread_channel_close_message(ASYNC_THREAD_CHANNEL_NO_PRODUCERS), 0); - } - return false; - } - zend_hash_index_update_ptr(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger, trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -270,7 +270,6 @@ static bool thread_channel_receive( if (UNEXPECTED(channel_bailed)) { ASYNC_MUTEX_LOCK(ch->mutex); - ch->parked_receivers--; zend_hash_index_del(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger); ASYNC_MUTEX_UNLOCK(ch->mutex); ZEND_ASYNC_WAKER_DESTROY(ZEND_ASYNC_CURRENT_COROUTINE); @@ -282,7 +281,6 @@ static bool thread_channel_receive( /* Woke up — remove from receiver queue, observe closed state */ ASYNC_MUTEX_LOCK(ch->mutex); - ch->parked_receivers--; zend_hash_index_del(&ch->receiver_triggers, (zend_ulong)(uintptr_t) trigger); const bool closed = ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event); ASYNC_MUTEX_UNLOCK(ch->mutex); @@ -364,11 +362,15 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity) ch->channel.close = thread_channel_close; ch->channel.event.dispose = thread_channel_event_dispose; + thread_channel_registry_add(ch); + return ch; } static void thread_channel_destroy(async_thread_channel_t *ch) { + thread_channel_registry_remove(ch); + /* Close and notify waiters before destroying */ if (!ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) { ASYNC_MUTEX_LOCK(ch->mutex); @@ -413,30 +415,8 @@ static bool thread_channel_event_dispose(zend_async_event_t *event) if (old == 1) { thread_channel_destroy(ch); - return true; } - /* An endpoint is gone. If every endpoint left is a parked waiter, no peer - * can ever make progress, so close the channel: parked recv()/send() then - * surface ThreadChannelException with the matching reason. Endpoints here - * are wrapper refs (ref_count); raw-pointer holders such as the thread pool - * keep ref_count == 1 and only ever drop it to 0 (handled above). */ - ASYNC_MUTEX_LOCK(ch->mutex); - if (ch->auto_disconnect && !ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) { - const int endpoints = old - 1; - const bool no_producers = ch->parked_receivers > 0 && endpoints <= 1; - const bool no_consumers = ch->parked_senders > 0 && endpoints <= 1; - - if (no_producers || no_consumers) { - ch->close_reason = no_producers ? ASYNC_THREAD_CHANNEL_NO_PRODUCERS - : ASYNC_THREAD_CHANNEL_NO_CONSUMERS; - ZEND_ASYNC_EVENT_SET_CLOSED(&ch->channel.event); - fire_all_triggers(&ch->receiver_triggers); - fire_all_triggers(&ch->sender_triggers); - } - } - ASYNC_MUTEX_UNLOCK(ch->mutex); - return true; } @@ -541,7 +521,6 @@ METHOD(__construct) thread_channel_object_t *obj = ASYNC_THREAD_CHANNEL_FROM_OBJ(Z_OBJ_P(ZEND_THIS)); obj->channel = async_thread_channel_create((int32_t) capacity); - obj->channel->auto_disconnect = true; } METHOD(send) @@ -632,6 +611,8 @@ METHOD(isFull) void async_register_thread_channel_ce(void) { + async_thread_channel_registry_init(); + async_ce_thread_channel_exception = register_class_Async_ThreadChannelException(async_ce_async_exception); async_ce_thread_channel = register_class_Async_ThreadChannel(async_ce_awaitable, zend_ce_countable); diff --git a/thread_channel.h b/thread_channel.h index 4a84940..3082444 100644 --- a/thread_channel.h +++ b/thread_channel.h @@ -49,30 +49,10 @@ struct _async_thread_channel_s { HashTable receiver_triggers; /* triggers from wrappers waiting to receive */ HashTable sender_triggers; /* triggers from wrappers waiting to send */ - /* Reference count for cross-thread sharing. For PHP-constructed channels it - * also counts live endpoints (one wrapper ref each), which auto_disconnect - * relies on. Raw C holders (e.g. the thread pool) keep ref_count at 1. */ + /* Reference count for cross-thread sharing */ zend_atomic_int ref_count; - - /* Coroutines currently suspended inside recv()/send() (mutex-guarded). - * Used to detect "no producer/consumer left" disconnect. */ - int32_t parked_receivers; - int32_t parked_senders; - - /* Why the channel closed; read by a woken recv/send to pick the message. */ - uint8_t close_reason; - - /* Auto-disconnect parked waiters when no peer endpoint remains. Set only for - * PHP-constructed channels, where ref_count is a reliable endpoint count. - * Off for raw pool channels, which manage close() explicitly. */ - bool auto_disconnect; }; -/* close_reason values */ -#define ASYNC_THREAD_CHANNEL_CLOSED 0 -#define ASYNC_THREAD_CHANNEL_NO_PRODUCERS 1 -#define ASYNC_THREAD_CHANNEL_NO_CONSUMERS 2 - /////////////////////////////////////////////////////////// /// PHP object wrapper (emalloc, per-thread) /////////////////////////////////////////////////////////// @@ -101,6 +81,12 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity); /* Close channel — wakes all waiters, rejects new send/recv */ void async_thread_channel_close(async_thread_channel_t *ch); +/* Init the process-wide channel registry (MINIT). */ +void async_thread_channel_registry_init(void); + +/* Close every live channel — called at shutdown so parked workers wake and exit. */ +void async_thread_channel_close_all(void); + /* Addref shared channel */ void async_thread_channel_addref(async_thread_channel_t *ch); From 84011c4045d008a3761e3118119ec3310b6b880a Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:34:36 +0000 Subject: [PATCH 5/6] #162: make ThreadChannel disconnect tests robust (assert no-hang, silent worker) Worker output emitted during shutdown disconnect double-flushed on the CI multi-core runner (a thread-stdout artifact, not a logic double-disconnect; 40x local runs were clean). The tests now keep the worker silent on disconnect and assert only the owner output: a hang fails via timeout, a spurious value fails via extra output. Claude-Session: https://claude.ai/code/session_01NMdPEuD85qzTjv2N4ihTQU --- .../042-recv_disconnect_on_owner_finish.phpt | 23 ++++++++----------- ...43-recv_disconnect_producer_coroutine.phpt | 22 ++++++------------ .../044-recv_disconnect_fanout.phpt | 19 +++++++-------- 3 files changed, 24 insertions(+), 40 deletions(-) diff --git a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt index a9b9b60..f090bdf 100644 --- a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt +++ b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt @@ -1,5 +1,5 @@ --TEST-- -ThreadChannel: recv() disconnects instead of hanging when the owner finishes without close() (#162) +ThreadChannel: recv() does not hang shutdown when the owner finishes without close() (#162) --SKIPIF-- recv(); - echo "worker: got a value\n"; - } catch (ThreadChannelException $e) { - echo "worker: " . $e->getMessage() . "\n"; - } + try { $ch->recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} }); -echo "main: end (nothing sent, channel not closed)\n"; +echo "main: end\n"; ?> --EXPECT-- -main: end (nothing sent, channel not closed) -worker: ThreadChannel is closed +main: end diff --git a/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt index 5b08853..2ffc8d7 100644 --- a/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt +++ b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt @@ -1,5 +1,5 @@ --TEST-- -ThreadChannel: recv() disconnects when the producer coroutine finishes without close() (#162) +ThreadChannel: recv() does not hang when the producer coroutine finishes without close() (#162) --SKIPIF-- recv(); - echo "worker: got a value\n"; - } catch (ThreadChannelException $e) { - echo "worker: " . $e->getMessage() . "\n"; - } + try { $ch->recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} }); - echo "producer: done (no send, no close)\n"; + echo "producer: done\n"; }); echo "main: done\n"; ?> --EXPECT-- main: done -producer: done (no send, no close) -worker: ThreadChannel is closed +producer: done diff --git a/tests/thread_channel/044-recv_disconnect_fanout.phpt b/tests/thread_channel/044-recv_disconnect_fanout.phpt index 6d94ee9..a6da854 100644 --- a/tests/thread_channel/044-recv_disconnect_fanout.phpt +++ b/tests/thread_channel/044-recv_disconnect_fanout.phpt @@ -1,5 +1,5 @@ --TEST-- -ThreadChannel: multiple workers parked on recv() all disconnect when the owner finishes (#162 fan-out) +ThreadChannel: many workers parked on recv() do not hang when the owner finishes (#162 fan-out) --SKIPIF-- recv(); echo "w$i: got value\n"; } - catch (ThreadChannelException $e) { echo "w$i: disconnected\n"; } +for ($i = 0; $i < 3; $i++) { + spawn_thread(function() use ($ch) { + try { $ch->recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} }); } echo "main: end\n"; ?> ---EXPECTF-- +--EXPECT-- main: end -w%d: disconnected -w%d: disconnected From 56f1733a4b3018ad988d49e4b441a42084d062f3 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 18:27:31 +0000 Subject: [PATCH 6/6] #162: add chaos test proving ThreadChannel orphan-recv disconnect at shutdown thread_channel/orphan_recv.feature: a coroutine spawns N workers parked on recv() of a fresh ThreadChannel and finishes without close()/await(). The shutdown registry close_all() must disconnect every worker. New step in _harness/Steps.php. 5 .phpt x random:1..30 green on ASAN+async-fuzz (150 interleavings, no hang/leak/UAF), including fan-out. Claude-Session: https://claude.ai/code/session_01NMdPEuD85qzTjv2N4ihTQU --- fuzzy-tests/CHANGELOG.md | 1 + fuzzy-tests/_harness/Steps.php | 27 +++++++++++++++++ .../thread_channel/orphan_recv.feature | 29 +++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 fuzzy-tests/thread_channel/orphan_recv.feature diff --git a/fuzzy-tests/CHANGELOG.md b/fuzzy-tests/CHANGELOG.md index 0d970f2..e1bdd13 100644 --- a/fuzzy-tests/CHANGELOG.md +++ b/fuzzy-tests/CHANGELOG.md @@ -7,6 +7,7 @@ are not mixed with test-suite history. ## Unreleased ### Added +- **#162 — ThreadChannel orphan recv disconnect at shutdown** — `thread_channel/orphan_recv.feature` (1 outline → 5 .phpt). A coroutine spawns N workers parked on `recv()` of a fresh `ThreadChannel`, then finishes without `close()`/`await()`; the channel's only reference drops with the owner. The process-wide registry `close_all()` at shutdown must wake every parked worker so the process exits instead of hanging. New step `coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel` (`tc_orphan_{spawn_attempts,spawned,spawn_failed}`). Invariants: every worker spawned, no orphan coroutines on the owner side, clean exit (a hang fails via timeout, a use-after-free in the shutdown disconnect fails under ASAN). 5 .phpt × `random:1..30` green on ASAN+async-fuzz — 150 interleavings, no findings — proving the transparent-thread-event + registry-close fix, including fan-out (N>1). - **Coverage gap — timer / delay / timeout chaos** — `timer/timer_chaos.feature` (7 scenarios → 10 .phpt). First chaos coverage of `Async\delay()` and `Async\timeout()` outside the deterministic `tests/sleep/*` and `tests/common/timeout_*`. New steps `coroutine "X" runs a cancellable delay of N ms` (`delay_{attempts,ok,cancelled}`), `coroutine "X" runs work of N ms guarded by timeout M ms` (`await(job, timeout(M))` → `tmo_{attempts,work_ok,timed_out,cancelled}`; OperationCanceledException = timed out, AsyncCancellation = externally cancelled), and `coroutine "X" disposes a safe scope with a child delaying N ms` (#132 zombie-timer probe). Scenarios: cancel-mid-delay + timing outline (0/5/50/150 ms), many concurrent timers draining the heap, timeout-fires, timeout-loses (the losing timeout must release its libuv watcher — the #082 leak class, asserted by running under ASAN `detect_leaks=1`), awaiter cancelled while both work and timeout are pending, and the #132 zombie-delay marker. 10 .phpt × fifo + random:1,7,42,1337 green on ASAN+UBSan+ZTS+async-fuzz with leak detection — no findings (delay/timeout watcher cleanup is solid). - **Coverage gap — Scope::allowZombies()** — `scope/extras.feature` (+1 scenario). The only API entry the coverage report flagged uncovered (167/168) once the method shipped (it opts a scope back into safe disposal — the inverse of `asNotSafely()`). New step `coroutine "X" disposes a fresh scope with allowZombies and a parked child`: a started child parks in a short `delay()`, the scope is disposed, and on a safe scope the child is NOT cancelled — it becomes a zombie, is dropped from the active count, and finishes on its own. Invariants: `allowZombies()` returns the same scope (identity), `zombie_child_finished == 1` (not cancelled), child settles exactly once, no orphan coroutines. Green across fifo + random:1,7,42,1337 on ASAN+UBSan+ZTS+async-fuzz. COVERAGE.md back to 168/168 (100%). - **Reinstated cancel scenarios after #144/#145/#146 fixes landed** — the three chaos features that originally *surfaced* these bugs had their cancel scenarios commented under `# Blocked: #NNN`. All three fixes are now in `true-async-stable`, so the scenarios are reinstated as regression backstops and pass under ASAN-ZTS (fifo). (1) `io/flock_chaos.feature` — cancel-mid-flock waiter + cancel-timing outline (5/50/150 ms): a parked `LOCK_EX` waiter is cancelled while the libuv worker still owns the task data; backstops the #146 inline-tail-on-task + pin-across-SUSPEND fix. (2) `curl/curl_multi_chaos.feature` — cancel-mid-`curl_multi_select` + timing outline (5/75/200 ms): backstops the #145 route-cancel-through-`finally` fix (no more heap corruption). (3) `exec/proc_open_chaos.feature` — proc_close-vs-parked-reader + close-timing outline (0/5/25/60 ms) + SIGTERM-vs-reader + cancel-reader-then-close: backstops the #144 notify-parked-req-on-close + pin-stdio-lifetime fix. 16 reinstated .phpt green under ASAN-ZTS; the random-seed matrix runs in the existing IO/curl nightly once an `--enable-async-fuzz` build is used. FINDINGS.md entries for all three moved to **fixed**; IO_PLAN.md coverage table updated; README.md layout refreshed (the stale `(TODO)` topics are all implemented). diff --git a/fuzzy-tests/_harness/Steps.php b/fuzzy-tests/_harness/Steps.php index da6925a..7b326a1 100644 --- a/fuzzy-tests/_harness/Steps.php +++ b/fuzzy-tests/_harness/Steps.php @@ -4286,6 +4286,33 @@ function(Context $ctx, string $coro, string $tc) { }) ->requires('zts'); + // When coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel + // Models #162: workers park on recv(); the owner finishes WITHOUT close() + // or await(). The channel is local (not tracked by the harness, so it is + // not closed at scenario end); the process-wide close_all() at shutdown + // must disconnect every worker, otherwise the process hangs. + $r->on('/^coroutine "([^"]+)" spawns (\S+) orphan workers parked on recv of a fresh thread channel$/', + function(Context $ctx, string $coro, string $nExpr) { + $n = (int)$ctx->resolver->resolve($nExpr); + $ctx->planAction($coro, function(Context $ctx) use ($n) { + $ch = new \Async\ThreadChannel(); + for ($i = 0; $i < $n; $i++) { + $ctx->inc("tc_orphan_spawn_attempts"); + try { + \Async\spawn_thread(self::unscoped(static function() use ($ch): void { + try { $ch->recv(); } + catch (\Throwable $e) { /* disconnected at shutdown */ } + })); + $ctx->inc("tc_orphan_spawned"); + } catch (\Throwable $e) { + $ctx->inc("tc_orphan_spawn_failed"); + } + } + // owner returns here: $ch drops out of scope, never closed/awaited + }); + }) + ->requires('zts'); + // ---- TaskGroup actions ---- // When coroutine "X" spawns N tasks into "G" that print "msg" diff --git a/fuzzy-tests/thread_channel/orphan_recv.feature b/fuzzy-tests/thread_channel/orphan_recv.feature new file mode 100644 index 0000000..fd2668d --- /dev/null +++ b/fuzzy-tests/thread_channel/orphan_recv.feature @@ -0,0 +1,29 @@ +Feature: ThreadChannel orphan recv workers disconnect at shutdown (#162) + + A worker started with Async\spawn_thread() and parked on ThreadChannel::recv() + must not keep the process alive when the owning side finishes without close(). + The owner here is a coroutine that spawns the workers, never sends, never + closes, never awaits — its only channel reference drops when it returns. The + process-wide registry close_all() at shutdown must wake every parked worker so + the process exits cleanly. + + Invariants for every interleaving (and every worker count): + every worker was spawned (no spawn failure) + no orphan coroutines on the owner's side + the process exits cleanly (a hang fails the run via timeout; a use-after- + free in the shutdown disconnect fails it under ASAN) + + Scenario Outline: N workers park on recv, the owner finishes without close + Given a coroutine "M" + When coroutine "M" spawns orphan workers parked on recv of a fresh thread channel + Then counter "tc_orphan_spawned" equals + And counter "tc_orphan_spawn_failed" equals 0 + And no orphan coroutines + + Examples: + | n | + | 1 | + | 2 | + | 4 | + | 8 | + | 16 |