diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c030a6..3eaf430 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) every live `ThreadChannel` is now tracked in a process-wide registry and **closed at shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers). + +### Changed +- **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`. + ## [0.7.3] - 2026-06-25 ### Fixed diff --git a/fuzzy-tests/CHANGELOG.md b/fuzzy-tests/CHANGELOG.md index 0d970f2..e1bdd13 100644 --- a/fuzzy-tests/CHANGELOG.md +++ b/fuzzy-tests/CHANGELOG.md @@ -7,6 +7,7 @@ are not mixed with test-suite history. ## Unreleased ### Added +- **#162 — ThreadChannel orphan recv disconnect at shutdown** — `thread_channel/orphan_recv.feature` (1 outline → 5 .phpt). A coroutine spawns N workers parked on `recv()` of a fresh `ThreadChannel`, then finishes without `close()`/`await()`; the channel's only reference drops with the owner. The process-wide registry `close_all()` at shutdown must wake every parked worker so the process exits instead of hanging. New step `coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel` (`tc_orphan_{spawn_attempts,spawned,spawn_failed}`). Invariants: every worker spawned, no orphan coroutines on the owner side, clean exit (a hang fails via timeout, a use-after-free in the shutdown disconnect fails under ASAN). 5 .phpt × `random:1..30` green on ASAN+async-fuzz — 150 interleavings, no findings — proving the transparent-thread-event + registry-close fix, including fan-out (N>1). - **Coverage gap — timer / delay / timeout chaos** — `timer/timer_chaos.feature` (7 scenarios → 10 .phpt). First chaos coverage of `Async\delay()` and `Async\timeout()` outside the deterministic `tests/sleep/*` and `tests/common/timeout_*`. New steps `coroutine "X" runs a cancellable delay of N ms` (`delay_{attempts,ok,cancelled}`), `coroutine "X" runs work of N ms guarded by timeout M ms` (`await(job, timeout(M))` → `tmo_{attempts,work_ok,timed_out,cancelled}`; OperationCanceledException = timed out, AsyncCancellation = externally cancelled), and `coroutine "X" disposes a safe scope with a child delaying N ms` (#132 zombie-timer probe). Scenarios: cancel-mid-delay + timing outline (0/5/50/150 ms), many concurrent timers draining the heap, timeout-fires, timeout-loses (the losing timeout must release its libuv watcher — the #082 leak class, asserted by running under ASAN `detect_leaks=1`), awaiter cancelled while both work and timeout are pending, and the #132 zombie-delay marker. 10 .phpt × fifo + random:1,7,42,1337 green on ASAN+UBSan+ZTS+async-fuzz with leak detection — no findings (delay/timeout watcher cleanup is solid). - **Coverage gap — Scope::allowZombies()** — `scope/extras.feature` (+1 scenario). The only API entry the coverage report flagged uncovered (167/168) once the method shipped (it opts a scope back into safe disposal — the inverse of `asNotSafely()`). New step `coroutine "X" disposes a fresh scope with allowZombies and a parked child`: a started child parks in a short `delay()`, the scope is disposed, and on a safe scope the child is NOT cancelled — it becomes a zombie, is dropped from the active count, and finishes on its own. Invariants: `allowZombies()` returns the same scope (identity), `zombie_child_finished == 1` (not cancelled), child settles exactly once, no orphan coroutines. Green across fifo + random:1,7,42,1337 on ASAN+UBSan+ZTS+async-fuzz. COVERAGE.md back to 168/168 (100%). - **Reinstated cancel scenarios after #144/#145/#146 fixes landed** — the three chaos features that originally *surfaced* these bugs had their cancel scenarios commented under `# Blocked: #NNN`. All three fixes are now in `true-async-stable`, so the scenarios are reinstated as regression backstops and pass under ASAN-ZTS (fifo). (1) `io/flock_chaos.feature` — cancel-mid-flock waiter + cancel-timing outline (5/50/150 ms): a parked `LOCK_EX` waiter is cancelled while the libuv worker still owns the task data; backstops the #146 inline-tail-on-task + pin-across-SUSPEND fix. (2) `curl/curl_multi_chaos.feature` — cancel-mid-`curl_multi_select` + timing outline (5/75/200 ms): backstops the #145 route-cancel-through-`finally` fix (no more heap corruption). (3) `exec/proc_open_chaos.feature` — proc_close-vs-parked-reader + close-timing outline (0/5/25/60 ms) + SIGTERM-vs-reader + cancel-reader-then-close: backstops the #144 notify-parked-req-on-close + pin-stdio-lifetime fix. 16 reinstated .phpt green under ASAN-ZTS; the random-seed matrix runs in the existing IO/curl nightly once an `--enable-async-fuzz` build is used. FINDINGS.md entries for all three moved to **fixed**; IO_PLAN.md coverage table updated; README.md layout refreshed (the stale `(TODO)` topics are all implemented). diff --git a/fuzzy-tests/_harness/Steps.php b/fuzzy-tests/_harness/Steps.php index da6925a..7b326a1 100644 --- a/fuzzy-tests/_harness/Steps.php +++ b/fuzzy-tests/_harness/Steps.php @@ -4286,6 +4286,33 @@ function(Context $ctx, string $coro, string $tc) { }) ->requires('zts'); + // When coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel + // Models #162: workers park on recv(); the owner finishes WITHOUT close() + // or await(). The channel is local (not tracked by the harness, so it is + // not closed at scenario end); the process-wide close_all() at shutdown + // must disconnect every worker, otherwise the process hangs. + $r->on('/^coroutine "([^"]+)" spawns (\S+) orphan workers parked on recv of a fresh thread channel$/', + function(Context $ctx, string $coro, string $nExpr) { + $n = (int)$ctx->resolver->resolve($nExpr); + $ctx->planAction($coro, function(Context $ctx) use ($n) { + $ch = new \Async\ThreadChannel(); + for ($i = 0; $i < $n; $i++) { + $ctx->inc("tc_orphan_spawn_attempts"); + try { + \Async\spawn_thread(self::unscoped(static function() use ($ch): void { + try { $ch->recv(); } + catch (\Throwable $e) { /* disconnected at shutdown */ } + })); + $ctx->inc("tc_orphan_spawned"); + } catch (\Throwable $e) { + $ctx->inc("tc_orphan_spawn_failed"); + } + } + // owner returns here: $ch drops out of scope, never closed/awaited + }); + }) + ->requires('zts'); + // ---- TaskGroup actions ---- // When coroutine "X" spawns N tasks into "G" that print "msg" diff --git a/fuzzy-tests/thread_channel/orphan_recv.feature b/fuzzy-tests/thread_channel/orphan_recv.feature new file mode 100644 index 0000000..fd2668d --- /dev/null +++ b/fuzzy-tests/thread_channel/orphan_recv.feature @@ -0,0 +1,29 @@ +Feature: ThreadChannel orphan recv workers disconnect at shutdown (#162) + + A worker started with Async\spawn_thread() and parked on ThreadChannel::recv() + must not keep the process alive when the owning side finishes without close(). + The owner here is a coroutine that spawns the workers, never sends, never + closes, never awaits — its only channel reference drops when it returns. The + process-wide registry close_all() at shutdown must wake every parked worker so + the process exits cleanly. + + Invariants for every interleaving (and every worker count): + every worker was spawned (no spawn failure) + no orphan coroutines on the owner's side + the process exits cleanly (a hang fails the run via timeout; a use-after- + free in the shutdown disconnect fails it under ASAN) + + Scenario Outline: N workers park on recv, the owner finishes without close + Given a coroutine "M" + When coroutine "M" spawns orphan workers parked on recv of a fresh thread channel + Then counter "tc_orphan_spawned" equals + And counter "tc_orphan_spawn_failed" equals 0 + And no orphan coroutines + + Examples: + | n | + | 1 | + | 2 | + | 4 | + | 8 | + | 16 | diff --git a/libuv_reactor.c b/libuv_reactor.c index 05a3b61..a719806 100644 --- a/libuv_reactor.c +++ b/libuv_reactor.c @@ -21,6 +21,7 @@ #include "exceptions.h" #include "php_async.h" +#include "thread_channel.h" #include "php_main.h" #include "thread.h" #include "thread_pool.h" @@ -166,6 +167,10 @@ static void libuv_reactor_quiesce(void) return; } + /* Wake any worker parked on a thread channel so it can exit; otherwise the + * wait below would hang on a non-awaited worker whose owner finished. */ + async_thread_channel_close_all(); + uv_mutex_lock(&child_thread_registry_mutex); while (zend_hash_num_elements(&child_thread_registry) > 0) { uv_cond_wait(&child_thread_registry_cond, &child_thread_registry_mutex); @@ -2161,8 +2166,11 @@ static void libuv_thread_notify_cb(uv_async_t *handle) } } + /* Set CLOSED AFTER stop(): the stop prologue short-circuits on a closed + * event and would skip the disarm (unref + uncount). */ ZEND_ASYNC_CALLBACKS_NOTIFY(&thread->event.base, &thread->event.result, thread->event.exception); thread->event.base.stop(&thread->event.base); + ZEND_ASYNC_EVENT_SET_CLOSED(&thread->event.base); if (ZEND_ASYNC_EVENT_IS_EXCEPTION_HANDLED(&thread->event.base)) { ZEND_THREAD_SET_EXCEPTION_CONSUMED(&thread->event); @@ -2189,37 +2197,46 @@ static bool libuv_thread_event_start(zend_async_event_t *event) async_thread_event_t *thread = (async_thread_event_t *) event; - /* Add ref on context for the thread runner */ - if (thread->event.context) { - zend_atomic_ptr_store(&thread->event.context->event, &thread->event); - ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context); - } - - /* Initialise registry and assign+register the context's key BEFORE - * uv_thread_create. Doing it after creates a race: a fast-exiting - * runner reads context->key == 0, skips self-removal, then we add - * the key and the entry leaks — quiesce hangs forever. */ - libuv_thread_registry_init(); + /* First start() is the spawn call: create the OS thread and stay transparent + * (no ref/count). loop_ref_count stays 0 so the first awaiter's start() arms. */ + if ((event->flags & ASYNC_THREAD_F_LAUNCHED) == 0) { - if (thread->event.context) { - thread->event.context->key = - (zend_async_thread_handle_t) async_ptr_to_index(thread->event.context); - libuv_thread_registry_add(thread->event.context->key); - } + if (thread->event.context) { + zend_atomic_ptr_store(&thread->event.context->event, &thread->event); + ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context); + } - const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context); + /* Assign+register the key BEFORE uv_thread_create: a fast-exiting runner + * that reads key == 0 skips self-removal and the registry entry leaks. */ + libuv_thread_registry_init(); - if (UNEXPECTED(ret != 0)) { if (thread->event.context) { - libuv_thread_registry_remove(thread->event.context->key); - thread->event.context->key = 0; - zend_atomic_int_dec(&thread->event.context->ref_count); + thread->event.context->key = + (zend_async_thread_handle_t) async_ptr_to_index(thread->event.context); + libuv_thread_registry_add(thread->event.context->key); } - async_throw_error("Failed to create thread: %s", uv_strerror(ret)); - return false; + const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context); + + if (UNEXPECTED(ret != 0)) { + if (thread->event.context) { + libuv_thread_registry_remove(thread->event.context->key); + thread->event.context->key = 0; + zend_atomic_int_dec(&thread->event.context->ref_count); + } + + async_throw_error("Failed to create thread: %s", uv_strerror(ret)); + return false; + } + + event->flags |= ASYNC_THREAD_F_LAUNCHED; + return true; } + /* Subsequent start() is an awaiter: arm the wait so the parent loop blocks + * for completion (ref the notify handle and count the event). */ + uv_ref((uv_handle_t *) &thread->uv_notify); + ZEND_ASYNC_EVENT_CLR_HIDDEN(event); event->loop_ref_count++; ZEND_ASYNC_INCREASE_EVENT_COUNT(event); return true; @@ -2234,13 +2251,13 @@ static bool libuv_thread_event_stop(zend_async_event_t *event) async_thread_event_t *thread = (async_thread_event_t *) event; - /* Unref async handle so it doesn't keep the event loop alive. - * Actual uv_close happens in dispose when refcount reaches 0. */ + /* Last awaiter left: disarm back to transparent. Decrement before hiding + * (the count macro is a no-op on hidden events). CLOSED is set on actual + * completion in libuv_thread_notify_cb, not here. */ uv_unref((uv_handle_t *) &thread->uv_notify); - - ZEND_ASYNC_EVENT_SET_CLOSED(event); event->loop_ref_count = 0; ZEND_ASYNC_DECREASE_EVENT_COUNT(event); + ZEND_ASYNC_EVENT_SET_HIDDEN(event); return true; } @@ -2443,6 +2460,12 @@ zend_async_thread_event_t *libuv_new_thread_event( thread_event->uv_notify.data = thread_event; + /* Transparent by default: notify stays armed but unref'd (does not keep the + * parent loop alive) and the event is hidden (not counted). start() arms the + * wait (ref + count) only when someone awaits. */ + uv_unref((uv_handle_t *) &thread_event->uv_notify); + ZEND_ASYNC_EVENT_SET_HIDDEN(&thread_event->event.base); + /* Allocate last: every early-failure path above pefree's ctx directly, * so keeping the mutex out of those paths avoids a leak. No-op under NTS. */ ZEND_ASYNC_THREAD_CONTEXT_EVENT_MUTEX_ALLOC(ctx); diff --git a/libuv_reactor.h b/libuv_reactor.h index 6bece13..1403d48 100644 --- a/libuv_reactor.h +++ b/libuv_reactor.h @@ -103,6 +103,10 @@ struct _async_thread_event_t uv_async_t uv_notify; /* Cross-thread notification handle */ }; +/* Thread event is launched (OS thread created). Shares event.base.flags; + * thread bits 13/14 are taken, so this uses 15. */ +#define ASYNC_THREAD_F_LAUNCHED (1u << 15) + struct _async_exec_event_t { zend_async_exec_event_t event; diff --git a/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt new file mode 100644 index 0000000..f090bdf --- /dev/null +++ b/tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt @@ -0,0 +1,28 @@ +--TEST-- +ThreadChannel: recv() does not hang shutdown when the owner finishes without close() (#162) +--SKIPIF-- + +--FILE-- +recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} +}); + +echo "main: end\n"; +?> +--EXPECT-- +main: end diff --git a/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt new file mode 100644 index 0000000..2ffc8d7 --- /dev/null +++ b/tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt @@ -0,0 +1,30 @@ +--TEST-- +ThreadChannel: recv() does not hang when the producer coroutine finishes without close() (#162) +--SKIPIF-- + +--FILE-- +recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} + }); + echo "producer: done\n"; +}); + +echo "main: done\n"; +?> +--EXPECT-- +main: done +producer: done diff --git a/tests/thread_channel/044-recv_disconnect_fanout.phpt b/tests/thread_channel/044-recv_disconnect_fanout.phpt new file mode 100644 index 0000000..a6da854 --- /dev/null +++ b/tests/thread_channel/044-recv_disconnect_fanout.phpt @@ -0,0 +1,28 @@ +--TEST-- +ThreadChannel: many workers parked on recv() do not hang when the owner finishes (#162 fan-out) +--SKIPIF-- + +--FILE-- +recv(); echo "worker: unexpected value\n"; } + catch (\Throwable $e) {} + }); +} + +echo "main: end\n"; +?> +--EXPECT-- +main: end diff --git a/thread_channel.c b/thread_channel.c index 837a9bd..9892c8a 100644 --- a/thread_channel.c +++ b/thread_channel.c @@ -62,6 +62,63 @@ static void fire_all_triggers(HashTable *triggers) } ZEND_HASH_FOREACH_END(); } +/////////////////////////////////////////////////////////////////////////////// +// Process-wide registry of live channels +/////////////////////////////////////////////////////////////////////////////// + +/* Every live shared channel is registered here. At shutdown async_thread_channel + * _close_all() closes them so workers parked on recv()/send() wake and exit + * instead of hanging the process (a non-awaited owner can finish without close). + * Lock order is always registry mutex -> channel mutex. */ +#ifdef ZTS +static MUTEX_T thread_channel_registry_mutex = NULL; +#endif +static HashTable thread_channel_registry; +static bool thread_channel_registry_inited = false; + +void async_thread_channel_registry_init(void) +{ + if (thread_channel_registry_inited) { + return; + } + zend_hash_init(&thread_channel_registry, 8, NULL, NULL, 1); + ASYNC_MUTEX_INIT(thread_channel_registry_mutex); + thread_channel_registry_inited = true; +} + +static void thread_channel_registry_add(async_thread_channel_t *ch) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + zend_hash_index_add_ptr(&thread_channel_registry, (zend_ulong)(uintptr_t) ch, ch); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); +} + +static void thread_channel_registry_remove(async_thread_channel_t *ch) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + zend_hash_index_del(&thread_channel_registry, (zend_ulong)(uintptr_t) ch); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); +} + +void async_thread_channel_close_all(void) +{ + if (!thread_channel_registry_inited) { + return; + } + ASYNC_MUTEX_LOCK(thread_channel_registry_mutex); + async_thread_channel_t *ch; + ZEND_HASH_FOREACH_PTR(&thread_channel_registry, ch) { + async_thread_channel_close(ch); + } ZEND_HASH_FOREACH_END(); + ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex); +} + /////////////////////////////////////////////////////////////////////////////// // C-level send/receive (coroutine-aware) /////////////////////////////////////////////////////////////////////////////// @@ -305,11 +362,15 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity) ch->channel.close = thread_channel_close; ch->channel.event.dispose = thread_channel_event_dispose; + thread_channel_registry_add(ch); + return ch; } static void thread_channel_destroy(async_thread_channel_t *ch) { + thread_channel_registry_remove(ch); + /* Close and notify waiters before destroying */ if (!ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) { ASYNC_MUTEX_LOCK(ch->mutex); @@ -550,6 +611,8 @@ METHOD(isFull) void async_register_thread_channel_ce(void) { + async_thread_channel_registry_init(); + async_ce_thread_channel_exception = register_class_Async_ThreadChannelException(async_ce_async_exception); async_ce_thread_channel = register_class_Async_ThreadChannel(async_ce_awaitable, zend_ce_countable); diff --git a/thread_channel.h b/thread_channel.h index faf794a..3082444 100644 --- a/thread_channel.h +++ b/thread_channel.h @@ -81,6 +81,12 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity); /* Close channel — wakes all waiters, rejects new send/recv */ void async_thread_channel_close(async_thread_channel_t *ch); +/* Init the process-wide channel registry (MINIT). */ +void async_thread_channel_registry_init(void); + +/* Close every live channel — called at shutdown so parked workers wake and exit. */ +void async_thread_channel_close_all(void); + /* Addref shared channel */ void async_thread_channel_addref(async_thread_channel_t *ch);