Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) every live `ThreadChannel` is now tracked in a process-wide registry and **closed at shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers).

### Changed
- **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`.

## [0.7.3] - 2026-06-25

### Fixed
Expand Down
1 change: 1 addition & 0 deletions fuzzy-tests/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ are not mixed with test-suite history.
## Unreleased

### Added
- **#162 — ThreadChannel orphan recv disconnect at shutdown** — `thread_channel/orphan_recv.feature` (1 outline → 5 .phpt). A coroutine spawns N workers parked on `recv()` of a fresh `ThreadChannel`, then finishes without `close()`/`await()`; the channel's only reference drops with the owner. The process-wide registry `close_all()` at shutdown must wake every parked worker so the process exits instead of hanging. New step `coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel` (`tc_orphan_{spawn_attempts,spawned,spawn_failed}`). Invariants: every worker spawned, no orphan coroutines on the owner side, clean exit (a hang fails via timeout, a use-after-free in the shutdown disconnect fails under ASAN). 5 .phpt × `random:1..30` green on ASAN+async-fuzz — 150 interleavings, no findings — proving the transparent-thread-event + registry-close fix, including fan-out (N>1).
- **Coverage gap — timer / delay / timeout chaos** — `timer/timer_chaos.feature` (7 scenarios → 10 .phpt). First chaos coverage of `Async\delay()` and `Async\timeout()` outside the deterministic `tests/sleep/*` and `tests/common/timeout_*`. New steps `coroutine "X" runs a cancellable delay of N ms` (`delay_{attempts,ok,cancelled}`), `coroutine "X" runs work of N ms guarded by timeout M ms` (`await(job, timeout(M))` → `tmo_{attempts,work_ok,timed_out,cancelled}`; OperationCanceledException = timed out, AsyncCancellation = externally cancelled), and `coroutine "X" disposes a safe scope with a child delaying N ms` (#132 zombie-timer probe). Scenarios: cancel-mid-delay + timing outline (0/5/50/150 ms), many concurrent timers draining the heap, timeout-fires, timeout-loses (the losing timeout must release its libuv watcher — the #082 leak class, asserted by running under ASAN `detect_leaks=1`), awaiter cancelled while both work and timeout are pending, and the #132 zombie-delay marker. 10 .phpt × fifo + random:1,7,42,1337 green on ASAN+UBSan+ZTS+async-fuzz with leak detection — no findings (delay/timeout watcher cleanup is solid).
- **Coverage gap — Scope::allowZombies()** — `scope/extras.feature` (+1 scenario). The only API entry the coverage report flagged uncovered (167/168) once the method shipped (it opts a scope back into safe disposal — the inverse of `asNotSafely()`). New step `coroutine "X" disposes a fresh scope with allowZombies and a parked child`: a started child parks in a short `delay()`, the scope is disposed, and on a safe scope the child is NOT cancelled — it becomes a zombie, is dropped from the active count, and finishes on its own. Invariants: `allowZombies()` returns the same scope (identity), `zombie_child_finished == 1` (not cancelled), child settles exactly once, no orphan coroutines. Green across fifo + random:1,7,42,1337 on ASAN+UBSan+ZTS+async-fuzz. COVERAGE.md back to 168/168 (100%).
- **Reinstated cancel scenarios after #144/#145/#146 fixes landed** — the three chaos features that originally *surfaced* these bugs had their cancel scenarios commented under `# Blocked: #NNN`. All three fixes are now in `true-async-stable`, so the scenarios are reinstated as regression backstops and pass under ASAN-ZTS (fifo). (1) `io/flock_chaos.feature` — cancel-mid-flock waiter + cancel-timing outline (5/50/150 ms): a parked `LOCK_EX` waiter is cancelled while the libuv worker still owns the task data; backstops the #146 inline-tail-on-task + pin-across-SUSPEND fix. (2) `curl/curl_multi_chaos.feature` — cancel-mid-`curl_multi_select` + timing outline (5/75/200 ms): backstops the #145 route-cancel-through-`finally` fix (no more heap corruption). (3) `exec/proc_open_chaos.feature` — proc_close-vs-parked-reader + close-timing outline (0/5/25/60 ms) + SIGTERM-vs-reader + cancel-reader-then-close: backstops the #144 notify-parked-req-on-close + pin-stdio-lifetime fix. 16 reinstated .phpt green under ASAN-ZTS; the random-seed matrix runs in the existing IO/curl nightly once an `--enable-async-fuzz` build is used. FINDINGS.md entries for all three moved to **fixed**; IO_PLAN.md coverage table updated; README.md layout refreshed (the stale `(TODO)` topics are all implemented).
Expand Down
27 changes: 27 additions & 0 deletions fuzzy-tests/_harness/Steps.php
Original file line number Diff line number Diff line change
Expand Up @@ -4286,6 +4286,33 @@ function(Context $ctx, string $coro, string $tc) {
})
->requires('zts');

// When coroutine "X" spawns N orphan workers parked on recv of a fresh thread channel
// Models #162: workers park on recv(); the owner finishes WITHOUT close()
// or await(). The channel is local (not tracked by the harness, so it is
// not closed at scenario end); the process-wide close_all() at shutdown
// must disconnect every worker, otherwise the process hangs.
$r->on('/^coroutine "([^"]+)" spawns (\S+) orphan workers parked on recv of a fresh thread channel$/',
function(Context $ctx, string $coro, string $nExpr) {
$n = (int)$ctx->resolver->resolve($nExpr);
$ctx->planAction($coro, function(Context $ctx) use ($n) {
$ch = new \Async\ThreadChannel();
for ($i = 0; $i < $n; $i++) {
$ctx->inc("tc_orphan_spawn_attempts");
try {
\Async\spawn_thread(self::unscoped(static function() use ($ch): void {
try { $ch->recv(); }
catch (\Throwable $e) { /* disconnected at shutdown */ }
}));
$ctx->inc("tc_orphan_spawned");
} catch (\Throwable $e) {
$ctx->inc("tc_orphan_spawn_failed");
}
}
// owner returns here: $ch drops out of scope, never closed/awaited
});
})
->requires('zts');

// ---- TaskGroup actions ----

// When coroutine "X" spawns N tasks into "G" that print "msg"
Expand Down
29 changes: 29 additions & 0 deletions fuzzy-tests/thread_channel/orphan_recv.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
Feature: ThreadChannel orphan recv workers disconnect at shutdown (#162)

A worker started with Async\spawn_thread() and parked on ThreadChannel::recv()
must not keep the process alive when the owning side finishes without close().
The owner here is a coroutine that spawns the workers, never sends, never
closes, never awaits — its only channel reference drops when it returns. The
process-wide registry close_all() at shutdown must wake every parked worker so
the process exits cleanly.

Invariants for every interleaving (and every worker count):
every worker was spawned (no spawn failure)
no orphan coroutines on the owner's side
the process exits cleanly (a hang fails the run via timeout; a use-after-
free in the shutdown disconnect fails it under ASAN)

Scenario Outline: N workers park on recv, the owner finishes without close
Given a coroutine "M"
When coroutine "M" spawns <n> orphan workers parked on recv of a fresh thread channel
Then counter "tc_orphan_spawned" equals <n>
And counter "tc_orphan_spawn_failed" equals 0
And no orphan coroutines

Examples:
| n |
| 1 |
| 2 |
| 4 |
| 8 |
| 16 |
77 changes: 50 additions & 27 deletions libuv_reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "exceptions.h"
#include "php_async.h"
#include "thread_channel.h"
#include "php_main.h"
#include "thread.h"
#include "thread_pool.h"
Expand Down Expand Up @@ -166,6 +167,10 @@ static void libuv_reactor_quiesce(void)
return;
}

/* Wake any worker parked on a thread channel so it can exit; otherwise the
* wait below would hang on a non-awaited worker whose owner finished. */
async_thread_channel_close_all();

uv_mutex_lock(&child_thread_registry_mutex);
while (zend_hash_num_elements(&child_thread_registry) > 0) {
uv_cond_wait(&child_thread_registry_cond, &child_thread_registry_mutex);
Expand Down Expand Up @@ -2161,8 +2166,11 @@ static void libuv_thread_notify_cb(uv_async_t *handle)
}
}

/* Set CLOSED AFTER stop(): the stop prologue short-circuits on a closed
* event and would skip the disarm (unref + uncount). */
ZEND_ASYNC_CALLBACKS_NOTIFY(&thread->event.base, &thread->event.result, thread->event.exception);
thread->event.base.stop(&thread->event.base);
ZEND_ASYNC_EVENT_SET_CLOSED(&thread->event.base);

if (ZEND_ASYNC_EVENT_IS_EXCEPTION_HANDLED(&thread->event.base)) {
ZEND_THREAD_SET_EXCEPTION_CONSUMED(&thread->event);
Expand All @@ -2189,37 +2197,46 @@ static bool libuv_thread_event_start(zend_async_event_t *event)

async_thread_event_t *thread = (async_thread_event_t *) event;

/* Add ref on context for the thread runner */
if (thread->event.context) {
zend_atomic_ptr_store(&thread->event.context->event, &thread->event);
ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context);
}

/* Initialise registry and assign+register the context's key BEFORE
* uv_thread_create. Doing it after creates a race: a fast-exiting
* runner reads context->key == 0, skips self-removal, then we add
* the key and the entry leaks — quiesce hangs forever. */
libuv_thread_registry_init();
/* First start() is the spawn call: create the OS thread and stay transparent
* (no ref/count). loop_ref_count stays 0 so the first awaiter's start() arms. */
if ((event->flags & ASYNC_THREAD_F_LAUNCHED) == 0) {

if (thread->event.context) {
thread->event.context->key =
(zend_async_thread_handle_t) async_ptr_to_index(thread->event.context);
libuv_thread_registry_add(thread->event.context->key);
}
if (thread->event.context) {
zend_atomic_ptr_store(&thread->event.context->event, &thread->event);
ZEND_ASYNC_THREAD_CONTEXT_ADDREF(thread->event.context);
}

const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context);
/* Assign+register the key BEFORE uv_thread_create: a fast-exiting runner
* that reads key == 0 skips self-removal and the registry entry leaks. */
libuv_thread_registry_init();

if (UNEXPECTED(ret != 0)) {
if (thread->event.context) {
libuv_thread_registry_remove(thread->event.context->key);
thread->event.context->key = 0;
zend_atomic_int_dec(&thread->event.context->ref_count);
thread->event.context->key =
(zend_async_thread_handle_t) async_ptr_to_index(thread->event.context);
libuv_thread_registry_add(thread->event.context->key);
}

async_throw_error("Failed to create thread: %s", uv_strerror(ret));
return false;
const int ret = uv_thread_create(&thread->uv_handle, zend_async_thread_run_fn, thread->event.context);

if (UNEXPECTED(ret != 0)) {
if (thread->event.context) {
libuv_thread_registry_remove(thread->event.context->key);
thread->event.context->key = 0;
zend_atomic_int_dec(&thread->event.context->ref_count);
}

async_throw_error("Failed to create thread: %s", uv_strerror(ret));
return false;
}

event->flags |= ASYNC_THREAD_F_LAUNCHED;
return true;
}

/* Subsequent start() is an awaiter: arm the wait so the parent loop blocks
* for completion (ref the notify handle and count the event). */
uv_ref((uv_handle_t *) &thread->uv_notify);
ZEND_ASYNC_EVENT_CLR_HIDDEN(event);
event->loop_ref_count++;
ZEND_ASYNC_INCREASE_EVENT_COUNT(event);
return true;
Expand All @@ -2234,13 +2251,13 @@ static bool libuv_thread_event_stop(zend_async_event_t *event)

async_thread_event_t *thread = (async_thread_event_t *) event;

/* Unref async handle so it doesn't keep the event loop alive.
* Actual uv_close happens in dispose when refcount reaches 0. */
/* Last awaiter left: disarm back to transparent. Decrement before hiding
* (the count macro is a no-op on hidden events). CLOSED is set on actual
* completion in libuv_thread_notify_cb, not here. */
uv_unref((uv_handle_t *) &thread->uv_notify);

ZEND_ASYNC_EVENT_SET_CLOSED(event);
event->loop_ref_count = 0;
ZEND_ASYNC_DECREASE_EVENT_COUNT(event);
ZEND_ASYNC_EVENT_SET_HIDDEN(event);
return true;
}

Expand Down Expand Up @@ -2443,6 +2460,12 @@ zend_async_thread_event_t *libuv_new_thread_event(

thread_event->uv_notify.data = thread_event;

/* Transparent by default: notify stays armed but unref'd (does not keep the
* parent loop alive) and the event is hidden (not counted). start() arms the
* wait (ref + count) only when someone awaits. */
uv_unref((uv_handle_t *) &thread_event->uv_notify);
ZEND_ASYNC_EVENT_SET_HIDDEN(&thread_event->event.base);

/* Allocate last: every early-failure path above pefree's ctx directly,
* so keeping the mutex out of those paths avoids a leak. No-op under NTS. */
ZEND_ASYNC_THREAD_CONTEXT_EVENT_MUTEX_ALLOC(ctx);
Expand Down
4 changes: 4 additions & 0 deletions libuv_reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ struct _async_thread_event_t
uv_async_t uv_notify; /* Cross-thread notification handle */
};

/* Thread event is launched (OS thread created). Shares event.base.flags;
* thread bits 13/14 are taken, so this uses 15. */
#define ASYNC_THREAD_F_LAUNCHED (1u << 15)

struct _async_exec_event_t
{
zend_async_exec_event_t event;
Expand Down
28 changes: 28 additions & 0 deletions tests/thread_channel/042-recv_disconnect_on_owner_finish.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
--TEST--
ThreadChannel: recv() does not hang shutdown when the owner finishes without close() (#162)
--SKIPIF--
<?php
if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--FILE--
<?php

use Async\ThreadChannel;
use function Async\spawn_thread;

// Worker parks on recv(); the owner (main) finishes without sending or closing.
// The worker must disconnect at shutdown so the process exits (no hang). The
// worker stays silent on disconnect: a hang fails via timeout, an unexpected
// value fails via extra output.
$ch = new ThreadChannel();

spawn_thread(function() use ($ch) {
try { $ch->recv(); echo "worker: unexpected value\n"; }
catch (\Throwable $e) {}
});

echo "main: end\n";
?>
--EXPECT--
main: end
30 changes: 30 additions & 0 deletions tests/thread_channel/043-recv_disconnect_producer_coroutine.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
--TEST--
ThreadChannel: recv() does not hang when the producer coroutine finishes without close() (#162)
--SKIPIF--
<?php
if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--FILE--
<?php

use Async\ThreadChannel;
use function Async\spawn;
use function Async\spawn_thread;

// The producer is a coroutine that creates the channel, spawns a worker parked
// on recv(), then finishes. The worker must disconnect at shutdown (no hang).
spawn(function() {
$ch = new ThreadChannel();
spawn_thread(function() use ($ch) {
try { $ch->recv(); echo "worker: unexpected value\n"; }
catch (\Throwable $e) {}
});
echo "producer: done\n";
});

echo "main: done\n";
?>
--EXPECT--
main: done
producer: done
28 changes: 28 additions & 0 deletions tests/thread_channel/044-recv_disconnect_fanout.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
--TEST--
ThreadChannel: many workers parked on recv() do not hang when the owner finishes (#162 fan-out)
--SKIPIF--
<?php
if (!PHP_ZTS) die('skip ZTS required');
if (!function_exists('Async\spawn_thread')) die('skip spawn_thread not available');
?>
--FILE--
<?php

use Async\ThreadChannel;
use function Async\spawn_thread;

// Several workers park on recv(); the owner finishes without close(). All must
// disconnect at shutdown so the process exits (no hang for any number of workers).
$ch = new ThreadChannel();

for ($i = 0; $i < 3; $i++) {
spawn_thread(function() use ($ch) {
try { $ch->recv(); echo "worker: unexpected value\n"; }
catch (\Throwable $e) {}
});
}

echo "main: end\n";
?>
--EXPECT--
main: end
Loading
Loading