feat(queue): reconnect commands/publisher path on Redis outage#34
feat(queue): reconnect commands/publisher path on Redis outage#34abnegate wants to merge 1 commit into
Conversation
The Broker receive loop already survives a Redis/Dragonfly restart by closing the dead connection and reconnecting on the next poll. The commands connection backing enqueue, commit, reject, retry and getQueueSize had no such handling: a connection that died between checkout and reuse threw straight through, dropping the publish or ack. Wrap those operations in executeCommand(), which mirrors receive()'s bounded reconnect (close + jittered exponential backoff, reusing the reconnect callbacks) but retries inline since there is no outer loop. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Greptile SummaryThis PR adds a
Confidence Score: 3/5The enqueue and single-command paths are safe to merge; the commit/reject multi-command closures can silently double-count stats counters on a mid-closure Redis failure. The packages/queue/src/Queue/Broker/Redis.php — specifically the Important Files Changed
Reviews (1): Last reviewed commit: "feat(queue): reconnect commands/publishe..." | Re-trigger Greptile |
| $this->executeCommand($queue, function () use ($queue, $pid): void { | ||
| $this->commands->remove("{$queue->namespace}.jobs.{$queue->name}.{$pid}"); | ||
| $this->commands->increment("{$queue->namespace}.stats.{$queue->name}.success"); | ||
| $this->commands->listRemove("{$queue->namespace}.processing.{$queue->name}", $pid); | ||
| $this->commands->decrement("{$queue->namespace}.stats.{$queue->name}.processing"); | ||
| }); |
There was a problem hiding this comment.
Retry re-executes partially-completed closures, corrupting stats counters
If the connection drops after remove() has already been acknowledged by Redis but before increment() is sent, executeCommand() retries the entire closure. On the second run remove() is a safe no-op, but increment("{…}.success") and decrement("{…}.processing") both fire a second time — permanently over-counting successes and under-counting processing. The same applies to reject() (increment("{…}.failed") / decrement("{…}.processing")).
The root cause is that four non-idempotent Redis commands are batched into one closure that the outer retry loop treats as atomic. Wrapping each individual command in its own executeCommand() call would avoid re-running already-successful counter mutations on reconnect. Alternatively, a Redis pipeline/MULTI-EXEC transaction would make the whole block genuinely atomic at the Redis level.
|
|
||
| // COMMAND_MAX_ATTEMPTS (3): 3 pushes, 2 reconnects between them. | ||
| $this->assertSame(3, $connection->pushAttempts); | ||
| $this->assertSame(2, $reconnects); | ||
| } | ||
|
|
||
| public function testCommitReconnectsAndSucceeds(): void | ||
| { | ||
| $queue = new Queue('ack-reconnect'); | ||
| $connection = new FailOnceAckConnection(); | ||
| $broker = new RedisBroker($connection, $connection); | ||
| $reconnects = 0; | ||
|
|
||
| $broker->setReconnectCallback(function () use (&$reconnects): void { | ||
| $reconnects++; | ||
| }); | ||
|
|
||
| $broker->commit($queue, new Message(['pid' => 'job-1', 'queue' => 'ack-reconnect', 'timestamp' => 0, 'payload' => []])); | ||
|
|
||
| $this->assertSame(2, $connection->removeAttempts); | ||
| $this->assertSame(1, $connection->closeCalls); | ||
| $this->assertSame(1, $reconnects); |
There was a problem hiding this comment.
testCommitReconnectsAndSucceeds only covers pre-first-command failure, masking the mid-closure double-count bug
FailOnceAckConnection.remove() throws on the very first call, so the entire closure never partially succeeds. The test therefore cannot detect that increment and decrement run twice when the connection drops between commands. Adding a connection double that lets remove() succeed but throws on increment() would expose the stats double-counting described in the production code comment above.
What
The
Broker\Redisreceive loop already survives a Redis/Dragonfly restart: onRedisException/RedisClusterExceptionit closes the dead connection, backs off, and reconnects on the next poll. The commands connection — which backsenqueue,commit,reject,retry, andgetQueueSize— had no equivalent handling. A pooled connection that died between checkout and reuse threw straight through, dropping the publish or ack.This wraps those operations in a new
executeCommand()helper that mirrorsreceive()'s bounded reconnect (close + jittered exponential backoff, reusing the existingsetReconnectCallback/setReconnectSuccessCallbackhooks), but retries inline since publishing/acks have no outer poll loop. Capped atCOMMAND_MAX_ATTEMPTS(3); a non-connection error (e.g. aRuntimeException) propagates immediately without retrying.Why
Appwrite Cloud's queue publisher is a pooled
Broker\Redisused purely forenqueue. Today a Dragonfly failover silently drops enqueues on stale pooled connections — the cache adapter already reconnects transparently, the queue publisher does not. This closes that gap at the source.Tests
Added publisher/ack-path cases to
RedisReconnectCallbackTest(unit tier, in-memoryConnectiondoubles, no live Redis):testEnqueueReconnectsAndSucceeds— fail once, reconnect, succeed; success callback fires with attempt counttestEnqueueThrowsAfterExhaustingReconnects— 3 attempts, 2 reconnects, then rethrowtestCommitReconnectsAndSucceeds— ack path reconnectstestEnqueueDoesNotRetryNonConnectionErrors—RuntimeExceptionpropagates, no reconnect3 of the 4 fail on
mainand pass with the change.composer test(38 tests), Pint, and PHPStan (level 5) all green locally.🤖 Generated with Claude Code