Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Fixed
- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) every live `ThreadChannel` is now tracked in a process-wide registry and **closed at shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers).
- **#162 `ThreadChannel`/`spawn_thread`: a worker parked on `recv()`/`send()` hung process shutdown when the owning side finished without `close()`.** A non-awaited worker thread kept the parent scheduler alive by itself, so the parent never reached shutdown, and the worker blocked forever (no output, no diagnostic). Two changes: (1) the thread completion event is now **transparent** — it only keeps the parent loop alive while a coroutine actually `await`s the thread — so a non-awaited worker no longer pins the parent, which finishes and proceeds to shutdown; (2) each thread now tracks the `ThreadChannel`s it created (thread-local, in `ASYNC_G`) and **closes them at its own shutdown**, so any worker still parked on `recv()`/`send()` wakes with `Async\ThreadChannelException` and exits. Awaited threads still block the parent as before; works for fan-out (any number of workers).

### Changed
- **phpredis async pool: `connect()`/`pconnect()` is now rejected in pool mode — configure `host`/`port` in the constructor options.** A pooled `Redis` object is a template, not a single live connection, so `connect()` has no meaning there: the pool's own connections are created elsewhere and silently ignored the `connect()` target, which dialed `127.0.0.1` and failed wherever Redis lives on a service hostname (e.g. `failed to acquire connection` inside Docker). Calling `connect()` on a pooled instance now throws `Redis::connect() is not supported in pool mode; set 'host' and 'port' in the constructor options`. Use `new Redis(['host' => 'redis', 'port' => 6379, 'pool' => [...]])`. This is consistent for both the checkout pool (`mux = 0`) and eager multiplex lanes (`mux > 0`, opened in the constructor). Fixed in phpredis `redis.c`/`redis_pool.c`; test `tests/async/008-pool_connect_host.phpt`.
Expand Down
1 change: 1 addition & 0 deletions async.c
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,7 @@ PHP_RINIT_FUNCTION(async) /* {{{ */
circular_buffer_ctor(&ASYNC_G(resumed_coroutines), 64, sizeof(zend_coroutine_t *), &zend_std_allocator);
zend_hash_init(&ASYNC_G(coroutines), 128, NULL, NULL, 0);
zend_hash_init(&ASYNC_G(deadlock_channels), 8, NULL, NULL, 0);
zend_hash_init(&ASYNC_G(thread_channels), 8, NULL, NULL, 0);

ASYNC_G(reactor_started) = false;

Expand Down
6 changes: 6 additions & 0 deletions async_API.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "context.h"
#include "coroutine.h"
#include "exceptions.h"
#include "thread_channel.h"
#include "future.h"
#include "iterator.h"
#include "php_async.h"
Expand Down Expand Up @@ -207,13 +208,18 @@ zend_coroutine_t *spawn(zend_async_scope_t *scope, zend_object *scope_provider,

static bool engine_shutdown(void)
{
/* Close channels created on this thread so a worker parked on one whose
* owner finished without close() wakes — before REACTOR_SHUTDOWN waits for it. */
async_thread_channel_close_owned();

ZEND_ASYNC_REACTOR_SHUTDOWN();

circular_buffer_dtor(&ASYNC_G(microtasks));
circular_buffer_dtor(&ASYNC_G(coroutine_queue));
circular_buffer_dtor(&ASYNC_G(resumed_coroutines));
zend_hash_destroy(&ASYNC_G(coroutines));
zend_hash_destroy(&ASYNC_G(deadlock_channels));
zend_hash_destroy(&ASYNC_G(thread_channels));

if (ASYNC_G(root_context) != NULL) {
async_context_t *root_context = (async_context_t *) ASYNC_G(root_context);
Expand Down
5 changes: 0 additions & 5 deletions libuv_reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include "exceptions.h"
#include "php_async.h"
#include "thread_channel.h"
#include "php_main.h"
#include "thread.h"
#include "thread_pool.h"
Expand Down Expand Up @@ -167,10 +166,6 @@ static void libuv_reactor_quiesce(void)
return;
}

/* Wake any worker parked on a thread channel so it can exit; otherwise the
* wait below would hang on a non-awaited worker whose owner finished. */
async_thread_channel_close_all();

uv_mutex_lock(&child_thread_registry_mutex);
while (zend_hash_num_elements(&child_thread_registry) > 0) {
uv_cond_wait(&child_thread_registry_cond, &child_thread_registry_mutex);
Expand Down
5 changes: 5 additions & 0 deletions php_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ bool debug_deadlock;
* reason "deadlock_resolved" rather than as a generic Deadlock error. */
HashTable deadlock_channels;

/* Thread channels created on THIS thread (holds a ref each). Closed at this
* thread's shutdown so workers parked on recv()/send() of a channel whose owner
* finished without close() wake and exit. Thread-local — no cross-thread lock. */
HashTable thread_channels;

#ifdef PHP_WIN32
#endif

Expand Down
61 changes: 13 additions & 48 deletions thread_channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,60 +63,29 @@ static void fire_all_triggers(HashTable *triggers)
}

///////////////////////////////////////////////////////////////////////////////
// Process-wide registry of live channels
// Per-thread registry of channels created on this thread
///////////////////////////////////////////////////////////////////////////////

/* Every live shared channel is registered here. At shutdown async_thread_channel
* _close_all() closes them so workers parked on recv()/send() wake and exit
* instead of hanging the process (a non-awaited owner can finish without close).
* Lock order is always registry mutex -> channel mutex. */
#ifdef ZTS
static MUTEX_T thread_channel_registry_mutex = NULL;
#endif
static HashTable thread_channel_registry;
static bool thread_channel_registry_inited = false;

void async_thread_channel_registry_init(void)
{
if (thread_channel_registry_inited) {
return;
}
zend_hash_init(&thread_channel_registry, 8, NULL, NULL, 1);
ASYNC_MUTEX_INIT(thread_channel_registry_mutex);
thread_channel_registry_inited = true;
}

/* Channels created on the current thread, holding a ref each. Closed at this
* thread's shutdown (async_thread_channel_close_owned) so a worker parked on a
* channel whose owner finished without close() wakes and exits. Thread-local:
* only the owning thread touches its own ASYNC_G(thread_channels) — no lock. */
static void thread_channel_registry_add(async_thread_channel_t *ch)
{
if (!thread_channel_registry_inited) {
return;
}
ASYNC_MUTEX_LOCK(thread_channel_registry_mutex);
zend_hash_index_add_ptr(&thread_channel_registry, (zend_ulong)(uintptr_t) ch, ch);
ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex);
}

static void thread_channel_registry_remove(async_thread_channel_t *ch)
{
if (!thread_channel_registry_inited) {
return;
}
ASYNC_MUTEX_LOCK(thread_channel_registry_mutex);
zend_hash_index_del(&thread_channel_registry, (zend_ulong)(uintptr_t) ch);
ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex);
async_thread_channel_addref(ch);
zend_hash_index_add_ptr(&ASYNC_G(thread_channels), (zend_ulong)(uintptr_t) ch, ch);
}

void async_thread_channel_close_all(void)
void async_thread_channel_close_owned(void)
{
if (!thread_channel_registry_inited) {
return;
}
ASYNC_MUTEX_LOCK(thread_channel_registry_mutex);
async_thread_channel_t *ch;
ZEND_HASH_FOREACH_PTR(&thread_channel_registry, ch) {
ZEND_HASH_FOREACH_PTR(&ASYNC_G(thread_channels), ch) {
async_thread_channel_close(ch);
/* Release the registry's ref. The channel survives while a wrapper or a
* worker still holds it; the woken worker drops the last ref and frees. */
ch->channel.event.dispose(&ch->channel.event);
} ZEND_HASH_FOREACH_END();
ASYNC_MUTEX_UNLOCK(thread_channel_registry_mutex);
zend_hash_clean(&ASYNC_G(thread_channels));
}

///////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -369,8 +338,6 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity)

static void thread_channel_destroy(async_thread_channel_t *ch)
{
thread_channel_registry_remove(ch);

/* Close and notify waiters before destroying */
if (!ZEND_ASYNC_EVENT_IS_CLOSED(&ch->channel.event)) {
ASYNC_MUTEX_LOCK(ch->mutex);
Expand Down Expand Up @@ -611,8 +578,6 @@ METHOD(isFull)

void async_register_thread_channel_ce(void)
{
async_thread_channel_registry_init();

async_ce_thread_channel_exception = register_class_Async_ThreadChannelException(async_ce_async_exception);

async_ce_thread_channel = register_class_Async_ThreadChannel(async_ce_awaitable, zend_ce_countable);
Expand Down
8 changes: 3 additions & 5 deletions thread_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ async_thread_channel_t *async_thread_channel_create(int32_t capacity);
/* Close channel — wakes all waiters, rejects new send/recv */
void async_thread_channel_close(async_thread_channel_t *ch);

/* Init the process-wide channel registry (MINIT). */
void async_thread_channel_registry_init(void);

/* Close every live channel — called at shutdown so parked workers wake and exit. */
void async_thread_channel_close_all(void);
/* Close channels created on this thread (releasing the registry ref each).
* Called at this thread's shutdown so parked workers wake and exit. */
void async_thread_channel_close_owned(void);

/* Addref shared channel */
void async_thread_channel_addref(async_thread_channel_t *ch);
Expand Down
Loading