From 3f1c2b24ed87ad3b2cb82ad9f7187f5bf3e9f514 Mon Sep 17 00:00:00 2001 From: Edmond <1571649+EdmondDantes@users.noreply.github.com> Date: Thu, 25 Jun 2026 19:18:37 +0000 Subject: [PATCH] #162: make the ThreadChannel shutdown registry thread-local (no global lock) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit used a process-wide registry that the main thread iterated at quiesce to close every channel — one thread reaching into all threads' channels, behind a global mutex. Replace it with a per-thread registry in ASYNC_G: a channel created on a thread is registered there (holding a ref), and each thread closes the channels it created at its own engine_shutdown (before REACTOR_SHUTDOWN waits for child threads). No cross-thread access, no lock; the registry ref keeps a channel alive until its owner closes it. Re-proved: orphan_recv chaos sweep 150/150 (5 worker counts x random:1..30) on ASAN+async-fuzz, no hang/leak/UAF. Claude-Session: https://claude.ai/code/session_01NMdPEuD85qzTjv2N4ihTQU --- CHANGELOG.md | 2 +- async.c | 1 + async_API.c | 6 +++++ libuv_reactor.c | 5 ---- php_async.h | 5 ++++ thread_channel.c | 61 +++++++++++------------------------------------- thread_channel.h | 8 +++---- 7 files changed, 29 insertions(+), 59 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3eaf430..4377448 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed -- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) every live `ThreadChannel` is now tracked in a process-wide registry and **closed at shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers). +- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) each thread now tracks the `ThreadChannel`s it created (thread-local, in `ASYNC_G`) and **closes them at its own shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers). ### Changed - **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`. diff --git a/async.c b/async.c index bb029bd..c99b054 100644 --- a/async.c +++ b/async.c @@ -1800,6 +1800,7 @@ PHP_RINIT_FUNCTION(async) /* {{{ */ circular_buffer_ctor(&ASYNC_G(resumed_coroutines), 64, sizeof(zend_coroutine_t *), &zend_std_allocator); zend_hash_init(&ASYNC_G(coroutines), 128, NULL, NULL, 0); zend_hash_init(&ASYNC_G(deadlock_channels), 8, NULL, NULL, 0); + zend_hash_init(&ASYNC_G(thread_channels), 8, NULL, NULL, 0); ASYNC_G(reactor_started) = false; diff --git a/async_API.c b/async_API.c index b456279..5b34313 100644 --- a/async_API.c +++ b/async_API.c @@ -18,6 +18,7 @@ #include "context.h" #include "coroutine.h" #include "exceptions.h" +#include "thread_channel.h" #include "future.h" #include "iterator.h" #include "php_async.h" @@ -207,6 +208,10 @@ zend_coroutine_t *spawn(zend_async_scope_t *scope, zend_object *scope_provider, static bool engine_shutdown(void) { + /* Close channels created on this thread so a worker parked on one whose + * owner finished without close() wakes — before REACTOR_SHUTDOWN waits for it. */ + async_thread_channel_close_owned(); + ZEND_ASYNC_REACTOR_SHUTDOWN(); circular_buffer_dtor(&ASYNC_G(microtasks)); @@ -214,6 +219,7 @@ static bool engine_shutdown(void) circular_buffer_dtor(&ASYNC_G(resumed_coroutines)); zend_hash_destroy(&ASYNC_G(coroutines)); zend_hash_destroy(&ASYNC_G(deadlock_channels)); + zend_hash_destroy(&ASYNC_G(thread_channels)); if (ASYNC_G(root_context) != NULL) { async_context_t *root_context = (async_context_t *) ASYNC_G(root_context); diff --git a/libuv_reactor.c b/libuv_reactor.c index a719806..2410a58 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -21,7 +21,6 @@ #include "exceptions.h" #include "php_async.h" -#include "thread_channel.h" #include "php_main.h" #include "thread.h" #include "thread_pool.h" @@ -167,10 +166,6 @@ static void libuv_reactor_quiesce(void) return; } - /* Wake any worker parked on a thread channel so it can exit; otherwise the - * wait below would hang on a non-awaited worker whose owner finished. */ - async_thread_channel_close_all(); - uv_mutex_lock(&child_thread_registry_mutex); while (zend_hash_num_elements(&child_thread_registry) > 0) { uv_cond_wait(&child_thread_registry_cond, &child_thread_registry_mutex); diff --git a/php_async.h b/php_async.h index 6be04c6..ebeb5bc 100644 --- a/php_async.h +++ b/php_async.h @@ -132,6 +132,11 @@ bool debug_deadlock; * reason "deadlock_resolved" rather than as a generic Deadlock error. */ HashTable deadlock_channels; +/* Thread channels created on THIS thread (holds a ref each). Closed at this + * thread's shutdown so workers parked on recv()/send() of a channel whose owner + * finished without close() wake and exit. Thread-local — no cross-thread lock. */ +HashTable thread_channels; + #ifdef PHP_WIN32 #endif diff --git a/thread_channel.c b/thread_channel.c index 9892c8a..3ba72be 100644 --- a/thread_channel.c +++ b/thread_channel.c @@ -63,60 +63,29 @@ static void fire_all_triggers(HashTable *triggers) } /////////////////////////////////////////////////////////////////////////////// -// Process-wide registry of live channels +// Per-thread registry of channels created on this thread /////////////////////////////////////////////////////////////////////////////// -/* Every live shared channel is registered here. At shutdown async_thread_channel - * _close_all() closes them so workers parked on recv()/send() wake and exit - * instead of hanging the process (a non-awaited owner can finish without close). - * Lock order is always registry mutex -> channel mutex. */ -#ifdef ZTS -static MUTEX_T thread_channel_registry_mutex = NULL; -#endif -static HashTable thread_channel_registry; -static bool thread_channel_registry_inited = false; - -void async_thread_channel_registry_init(void) -{ - if (thread_channel_registry_inited) { - return; - } - zend_hash_init(&thread_channel_registry, 8, NULL, NULL, 1); - ASYNC_MUTEX_INIT(thread_channel_registry_mutex); - thread_channel_registry_inited = true; -} - +/* Channels created on the current thread, holding a ref each. Closed at this + * thread's shutdown (async_thread_channel_close_owned) so a worker parked on a + * channel whose owner finished without close() wakes and exits. Thread-local: + * only the owning thread touches its own ASYNC_G(thread_channels) — no lock. */ static void thread_channel_registry_add(async_thread_channel_t *ch) { - if (!thread_channel_registry_inited) { - return; - } - ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); - zend_hash_index_add_ptr(&thread_channel_registry, (zend_ulong)(uintptr_t) ch, ch); - ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); -} - -static void thread_channel_registry_remove(async_thread_channel_t *ch) -{ - if (!thread_channel_registry_inited) { - return; - } - ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); - zend_hash_index_del(&thread_channel_registry, (zend_ulong)(uintptr_t) ch); - ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); + async_thread_channel_addref(ch); + zend_hash_index_add_ptr(&ASYNC_G(thread_channels), (zend_ulong)(uintptr_t) ch, ch); } -void async_thread_channel_close_all(void) +void async_thread_channel_close_owned(void) { - if (!thread_channel_registry_inited) { - return; - } - ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); async_thread_channel_t *ch; - ZEND_HASH_FOREACH_PTR(&thread_channel_registry, ch) { + ZEND_HASH_FOREACH_PTR(&ASYNC_G(thread_channels), ch) { async_thread_channel_close(ch); + /* Release the registry's ref. The channel survives while a wrapper or a + * worker still holds it; the woken worker drops the last ref and frees. */ + ch->channel.event.dispose(&ch->channel.event); } ZEND_HASH_FOREACH_END(); - ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); + zend_hash_clean(&ASYNC_G(thread_channels)); } /////////////////////////////////////////////////////////////////////////////// @@ -369,8 +338,6 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity) static void thread_channel_destroy(async_thread_channel_t *ch) { - thread_channel_registry_remove(ch); - /* Close and notify waiters before destroying */ if (!ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) { ASYNC_MUTEX_LOCK(ch->mutex); @@ -611,8 +578,6 @@ METHOD(isFull) void async_register_thread_channel_ce(void) { - async_thread_channel_registry_init(); - async_ce_thread_channel_exception = register_class_Async_ThreadChannelException(async_ce_async_exception); async_ce_thread_channel = register_class_Async_ThreadChannel(async_ce_awaitable, zend_ce_countable); diff --git a/thread_channel.h b/thread_channel.h index 3082444..b5a2047 100644 --- a/thread_channel.h +++ b/thread_channel.h @@ -81,11 +81,9 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity); /* Close channel — wakes all waiters, rejects new send/recv */ void async_thread_channel_close(async_thread_channel_t *ch); -/* Init the process-wide channel registry (MINIT). */ -void async_thread_channel_registry_init(void); - -/* Close every live channel — called at shutdown so parked workers wake and exit. */ -void async_thread_channel_close_all(void); +/* Close channels created on this thread (releasing the registry ref each). + * Called at this thread's shutdown so parked workers wake and exit. */ +void async_thread_channel_close_owned(void); /* Addref shared channel */ void async_thread_channel_addref(async_thread_channel_t *ch);