Skip to content

fix(queue): recover ghost active jobs after ungraceful shutdown and prevent reserve loop#15

Open
JulioVianaDev wants to merge 1 commit intoOpenpanel-dev:mainfrom
JulioVianaDev:main
Open

fix(queue): recover ghost active jobs after ungraceful shutdown and prevent reserve loop#15
JulioVianaDev wants to merge 1 commit intoOpenpanel-dev:mainfrom
JulioVianaDev:main

Conversation

@JulioVianaDev
Copy link
Copy Markdown

@JulioVianaDev JulioVianaDev commented Mar 26, 2026

Summary

  • Fixes a bug where ungraceful worker shutdown (e.g. Ctrl+C/crash) left stale entries in ns:g:<groupId>:active, causing repeated reserve failures and blocked groups.
  • Adds ghost/stale active self-healing in reserve-atomic.lua, reserve.lua, and reserve-batch.lua.
  • Adds queue-level startup recovery and worker synchronization so recovery runs once per queue and workers wait for it before processing.
  • Adds regression coverage for ghost active job detection and automatic recovery.
  • Includes related benchmark/runtime setup updates used while validating this scenario.

Root Cause

reserve logic treated any non-empty group active list as valid active state.
After an ungraceful shutdown, the active entry could persist even though no worker owned it anymore. With long jobTimeoutMs (e.g. 7 days), stalled recovery was too delayed, resulting in loops like:

  • Blocking found group but reserve failed
  • Restored group <id> to ready ...

This produced repeated retries and group starvation.

What Changed

  • Lua self-healing on reserve path
    • src/lua/reserve-atomic.lua
    • src/lua/reserve.lua
    • src/lua/reserve-batch.lua
    • Detects stale active entries and cleans them atomically before reserve decisions.
  • Queue startup recovery
    • src/queue.ts
    • Adds startup recovery workflow for ghost active entries and makes workers wait until recovery is complete.
  • Worker startup coordination
    • src/worker.ts
    • Worker no longer redundantly scans on each worker startup; it waits for queue-level recovery.
  • Regression test
    • test/queue.ghost-active-job.test.ts
    • Reproduces ghost scenario and validates automatic recovery behavior.
  • Related validation environment/files
    • benchmark/benchmark.ts
    • docker-compose.yml
    • benchmark/results/groupmq.json
    • benchmark/results/groupmq_redis.json

Why This Approach

  • Removes need for manual Redis operations (LRANGE/LREM) after crashes.
  • Recovers blocked groups automatically on restart.
  • Avoids extra CPU overhead from repeated per-worker recovery scans.
  • Keeps reserve-path self-healing as a runtime safety net.

Test Plan

  • Run ghost regression test:
    • pnpm vitest run test/queue.ghost-active-job.test.ts
  • Verify expected behavior:
    • ghost entry is detected after ungraceful stop
    • startup recovery clears stale active state
    • jobs are reservable again without manual Redis cleanup

Production

  • We test in production until this week and work really well with timeouts jobs and not affect any other queue or worker configuration in our project.

Summary by CodeRabbit

  • New Features

    • Added automatic recovery for stale jobs that remain in an incomplete state after ungraceful shutdowns.
    • Workers now automatically detect and recover "ghost" jobs on startup.
  • Bug Fixes

    • Improved job state consistency by implementing self-healing checks during job reservations.
  • Tests

    • Added comprehensive test coverage for ghost job recovery scenarios.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 26, 2026

📝 Walkthrough

Walkthrough

This PR implements a self-healing mechanism for handling stale "ghost" active jobs in group queues. It adds detection and cleanup logic to three Lua reservation scripts, introduces a recoverActiveJobs() method for manual recovery, enables automatic recovery on worker startup, includes comprehensive test coverage for ghost job scenarios, and updates benchmark and Docker configurations.

Changes

Cohort / File(s) Summary
Lua Reserve Script Self-Healing
src/lua/reserve-atomic.lua, src/lua/reserve-batch.lua, src/lua/reserve.lua
Added stale active job detection and cleanup logic to each script. Inspects first active job entry to determine staleness via processing set presence, job status check, and heartbeat deadline comparison. Clears stale entries, re-queues jobs to group sorted set with stored score, resets status to waiting, and removes from processing set.
Queue Recovery Method
src/queue.ts
Added public recoverActiveJobs(): Promise<number> method that enumerates all groups, reads each group's active list, and processes stale jobs by removing from active/processing sets, restoring to group queue with stored score, updating status to waiting, and re-adding group to ready set.
Worker Startup Recovery
src/worker.ts
Integrated automatic recovery call in _runLoop() startup phase via this.q.recoverActiveJobs(), with try-catch logging for failures. Adjusted indentation of reserveBlocking() call arguments.
Ghost Active Job Test Suite
test/queue.ghost-active-job.test.ts
New comprehensive test file validating ghost active job detection and recovery. Includes helper logic for generating ghost reports, simulates ungraceful worker shutdown, verifies orphaned entries exist post-shutdown, and confirms automatic recovery when new worker starts.
Benchmark Results & Configuration
benchmark/benchmark.ts, benchmark/results/groupmq.json, benchmark/results/groupmq_redis.json
Updated benchmark tool with shell: true spawn option. Appended new benchmark result sets to groupmq results with reformatted JSON structure. Updated Redis command statistics reflecting different benchmark run profile.
Docker Compose Configuration
docker-compose.yml
Modified dragonfly service with memory limit constraint (2g), maxmemory flag (1gb), and proactor threads configuration (2).

Sequence Diagram

sequenceDiagram
    participant Worker as Worker
    participant Queue as Queue
    participant Redis as Redis
    participant LuaScript as Lua Reserve Script

    Worker->>Worker: Startup / _runLoop()
    Worker->>Queue: recoverActiveJobs()
    Queue->>Redis: SMEMBERS ${ns}:groups
    activate Redis
    Redis-->>Queue: [groupId1, groupId2, ...]
    deactivate Redis

    loop For each group
        Queue->>Redis: LRANGE ${ns}:g:${groupId}:active 0 -1
        Redis-->>Queue: [jobId1, jobId2, ...]
        
        loop For each active job
            Queue->>Redis: ZSCORE ${ns}:processing ${jobId}
            Redis-->>Queue: processing score (or nil)
            Queue->>Redis: HGET ${ns}:j:${jobId} status
            Redis-->>Queue: job status
            
            alt Job is stale
                Queue->>Redis: LREM ${ns}:g:${groupId}:active
                Queue->>Redis: ZREM ${ns}:processing ${jobId}
                Queue->>Redis: DEL ${ns}:processing:${jobId}
                Queue->>Redis: ZADD ${ns}:g:${groupId} score jobId
                Queue->>Redis: HSET ${ns}:j:${jobId} status waiting
            end
        end
    end

    Queue-->>Worker: recovered count
    Worker->>Queue: reserveBlocking(...)
    
    activate Queue
    Queue->>LuaScript: Execute reserve.lua
    activate LuaScript
    LuaScript->>Redis: Check group active list
    LuaScript->>Redis: Self-heal if stale detected
    LuaScript->>Redis: Reserve next job from group
    LuaScript-->>Queue: Reserved job
    deactivate LuaScript
    deactivate Queue
    
    Queue-->>Worker: Job to process
    Worker->>Worker: Process job
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related issues

Poem

A rabbit hops through Redis bright, 🐰
Finding ghosts that lost the fight,
Self-healing magic, stale jobs mend,
From active lists, bad entries send,
Back to queue, to wait and try,
No more phantom jobs gone awry! ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary fix: recovering ghost active jobs after ungraceful shutdown and preventing reserve loop issues.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
benchmark/benchmark.ts (1)

463-478: ⚠️ Potential issue | 🔴 Critical

Remove shell: true from both worker launches (lines 463–478 and 729–744).

cliOpts.jobType and cliOpts.db are type-cast but not runtime-validated. With shell: true, these values are interpreted as shell input, creating a command injection surface. Additionally, workerProcess.kill('SIGTERM') then targets the shell wrapper instead of the Node process, breaking signal delivery to the worker's SIGTERM handler and preventing graceful cleanup of Redis connections. Spawn without the shell wrapper; if this was added for cross-platform compatibility, resolve the executable path instead.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@benchmark/benchmark.ts` around lines 463 - 478, The spawn calls that create
workerProcess (invocations using 'npx' + 'jiti' launching
'benchmark/worker-process.ts') currently pass shell: true, which creates a shell
wrapper (and a command-injection surface for cliOpts.jobType and cliOpts.db) and
causes workerProcess.kill('SIGTERM') to target the shell instead of the child
Node process; remove shell: true from both worker launches (the spawn call that
assigns workerProcess and the second symmetric spawn later) so the child is
spawned directly, and if cross-platform resolution was the reason for using a
shell, resolve the executable path (e.g., locate the node/jiti binary or use npm
bin resolution) and invoke spawn without shell to ensure signals reach the Node
worker and eliminate command injection risk.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docker-compose.yml`:
- Around line 13-16: The compose file uses deploy.resources.limits.memory which
is ignored by non-Swarm `docker compose up`; add a service-level memory cap by
adding `mem_limit: 2g` under the service definition (alongside the existing
`deploy` block) so the 2g limit is enforced in non-Swarm mode; keep the existing
`deploy.resources.limits.memory: 2g` intact for Swarm compatibility.

In `@src/queue.ts`:
- Around line 1806-1844: recoverActiveJobs is re-adding every job in
:g:<groupId>:active without checking whether the job is actually stale or still
heartbeating, which can cause live jobs to be re-queued during rolling deploys;
update recoverActiveJobs to only requeue when the same stale checks used by the
Lua reserve logic pass (e.g. inspect `${this.ns}:processing:${jobId}`
heartbeat/timestamp or a processing field on `${this.ns}:job:${jobId}` and
require it to be older than a configured staleThreshold) and obtain a
cluster-wide one-shot recovery gate before proceeding (use a SETNX/expire lock
key like `${this.ns}:recovery:lock` or per-queue recovery key to ensure only one
node performs recovery at a time); modify the code that currently lrem/zrem/zadd
jobs so those mutations only happen after the stale check succeeds and ensure
you update job status/hashes the same way as the Lua reserve path (use
recoverActiveJobs, activeKey, processingKey, jobKey identifiers to locate the
logic).

In `@test/queue.ghost-active-job.test.ts`:
- Around line 115-177: The test's simulateCtrlC/setupGhost flow never creates a
real "ghost-active" reservation because simulateCtrlC only flips Worker
internals (simulateCtrlC) but doesn't cancel the in-flight processSingleJob
promise or its local heartbeat timers (heartbeatDelayTimer/hbTimer), so the
original worker continues heartbeating job1; to fix, change setupGhost to build
a leaked reservation directly (create the Redis reservation/lock for job1 as if
the original worker died) instead of trying to fake a crash with simulateCtrlC,
and add a second test case that specifically exercises reserve-time self-healing
by invoking the code path without calling recoverActiveJobs() first — look for
functions/fields: Worker/processSingleJob, heartbeatDelayTimer, hbTimer,
recoverActiveJobs, simulateCtrlC, and the test helper setupGhost to implement
the leaked reservation and the additional test scenario.

---

Outside diff comments:
In `@benchmark/benchmark.ts`:
- Around line 463-478: The spawn calls that create workerProcess (invocations
using 'npx' + 'jiti' launching 'benchmark/worker-process.ts') currently pass
shell: true, which creates a shell wrapper (and a command-injection surface for
cliOpts.jobType and cliOpts.db) and causes workerProcess.kill('SIGTERM') to
target the shell instead of the child Node process; remove shell: true from both
worker launches (the spawn call that assigns workerProcess and the second
symmetric spawn later) so the child is spawned directly, and if cross-platform
resolution was the reason for using a shell, resolve the executable path (e.g.,
locate the node/jiti binary or use npm bin resolution) and invoke spawn without
shell to ensure signals reach the Node worker and eliminate command injection
risk.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 77a81ecd-c1f6-4543-88a6-d397c9343c0e

📥 Commits

Reviewing files that changed from the base of the PR and between 6439d4e and 671cb3f.

📒 Files selected for processing (10)
  • benchmark/benchmark.ts
  • benchmark/results/groupmq.json
  • benchmark/results/groupmq_redis.json
  • docker-compose.yml
  • src/lua/reserve-atomic.lua
  • src/lua/reserve-batch.lua
  • src/lua/reserve.lua
  • src/queue.ts
  • src/worker.ts
  • test/queue.ghost-active-job.test.ts

Comment on lines +7298 to +8045
{
"timestamp": 1774458318858,
"queueType": "groupmq",
"jobType": "cpu",
"totalJobs": 500,
"workersCount": 4,
"completedJobs": 501,
"durationMs": 137749,
"throughputJobsPerSec": 3.64,
"avgPickupMs": 59782.75,
"avgProcessingMs": 789.23,
"avgTotalMs": 60571.99,
"p95PickupMs": 121346,
"p95ProcessingMs": 1425.02,
"p95TotalMs": 122311.9,
"peakCpuPercent": 101.5,
"peakMemoryMB": 105.3,
"avgCpuPercent": -0.3,
"avgMemoryMB": 69.2,
"redisStats": {
"commandstats": {
"config|resetstat": {
"calls": 1,
"usec": 2009,
"usec_per_call": 2009,
"rejected_calls": 0,
"failed_calls": 0
},
"lrange": {
"calls": 200,
"usec": 1443,
"usec_per_call": 7.22,
"rejected_calls": 0,
"failed_calls": 0
},
"incr": {
"calls": 500,
"usec": 43296,
"usec_per_call": 86.59,
"rejected_calls": 0,
"failed_calls": 0
},
"zrangebyscore": {
"calls": 359,
"usec": 2510,
"usec_per_call": 6.99,
"rejected_calls": 0,
"failed_calls": 0
},
"zadd": {
"calls": 2198,
"usec": 60115,
"usec_per_call": 27.35,
"rejected_calls": 0,
"failed_calls": 0
},
"bzpopmin": {
"calls": 219,
"usec": 4344,
"usec_per_call": 19.84,
"rejected_calls": 0,
"failed_calls": 0
},
"evalsha": {
"calls": 2295,
"usec": 807131,
"usec_per_call": 351.69,
"rejected_calls": 0,
"failed_calls": 0
},
"lrem": {
"calls": 1,
"usec": 9,
"usec_per_call": 9,
"rejected_calls": 0,
"failed_calls": 0
},
"sadd": {
"calls": 500,
"usec": 4907,
"usec_per_call": 9.81,
"rejected_calls": 0,
"failed_calls": 0
},
"hset": {
"calls": 2503,
"usec": 44688,
"usec_per_call": 17.85,
"rejected_calls": 0,
"failed_calls": 0
},
"srem": {
"calls": 50,
"usec": 144,
"usec_per_call": 2.88,
"rejected_calls": 0,
"failed_calls": 0
},
"hmget": {
"calls": 501,
"usec": 3872,
"usec_per_call": 7.73,
"rejected_calls": 0,
"failed_calls": 0
},
"get": {
"calls": 1225,
"usec": 30527,
"usec_per_call": 24.92,
"rejected_calls": 0,
"failed_calls": 0
},
"zpopmin": {
"calls": 551,
"usec": 3656,
"usec_per_call": 6.64,
"rejected_calls": 0,
"failed_calls": 0
},
"llen": {
"calls": 419,
"usec": 781,
"usec_per_call": 1.86,
"rejected_calls": 0,
"failed_calls": 0
},
"del": {
"calls": 1551,
"usec": 4795,
"usec_per_call": 3.09,
"rejected_calls": 0,
"failed_calls": 0
},
"info": {
"calls": 8,
"usec": 1531,
"usec_per_call": 191.38,
"rejected_calls": 0,
"failed_calls": 0
},
"script|load": {
"calls": 28,
"usec": 944,
"usec_per_call": 33.71,
"rejected_calls": 0,
"failed_calls": 0
},
"lindex": {
"calls": 868,
"usec": 2290,
"usec_per_call": 2.64,
"rejected_calls": 0,
"failed_calls": 0
},
"smembers": {
"calls": 195,
"usec": 3943,
"usec_per_call": 20.22,
"rejected_calls": 0,
"failed_calls": 0
},
"zcard": {
"calls": 5615,
"usec": 7020,
"usec_per_call": 1.25,
"rejected_calls": 0,
"failed_calls": 0
},
"zscore": {
"calls": 869,
"usec": 3428,
"usec_per_call": 3.94,
"rejected_calls": 0,
"failed_calls": 0
},
"hmset": {
"calls": 500,
"usec": 167628,
"usec_per_call": 335.26,
"rejected_calls": 0,
"failed_calls": 0
},
"set": {
"calls": 1445,
"usec": 96855,
"usec_per_call": 67.03,
"rejected_calls": 0,
"failed_calls": 0
},
"time": {
"calls": 500,
"usec": 1786,
"usec_per_call": 3.57,
"rejected_calls": 0,
"failed_calls": 0
},
"hget": {
"calls": 921,
"usec": 5815,
"usec_per_call": 6.31,
"rejected_calls": 0,
"failed_calls": 0
},
"zrem": {
"calls": 602,
"usec": 3418,
"usec_per_call": 5.68,
"rejected_calls": 0,
"failed_calls": 0
},
"lpop": {
"calls": 500,
"usec": 2019,
"usec_per_call": 4.04,
"rejected_calls": 0,
"failed_calls": 0
},
"lpush": {
"calls": 501,
"usec": 2299,
"usec_per_call": 4.59,
"rejected_calls": 0,
"failed_calls": 0
},
"zrange": {
"calls": 1619,
"usec": 6934,
"usec_per_call": 4.28,
"rejected_calls": 0,
"failed_calls": 0
},
"slowlog|reset": {
"calls": 1,
"usec": 379,
"usec_per_call": 379,
"rejected_calls": 0,
"failed_calls": 0
}
},
"slowlog": [
[
48,
1774458456,
92361,
[
"info",
"commandstats"
],
"192.168.16.1:52890",
""
],
[
47,
1774458335,
26709,
[
"get",
"groupmq:benchmark-1774456776815-nk6558dw7yp}:paused"
],
"192.168.16.1:33890",
""
],
[
46,
1774458326,
55079,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-18",
"{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}",
"3",
"1774458326324",
"0",
"91188998-3100-47d9-af93-21e4a857117e",
"1",
"1774458326324",
"0"
],
"192.168.16.1:52902",
""
],
[
45,
1774458326,
54866,
[
"HMSET",
"groupmq:benchmark-1774458318858-vougqhix1g}:job:91188998-3100-47d9-af93-21e4a857117e",
"id",
"91188998-3100-47d9-af93-21e4a857117e",
"groupId",
"group-18",
"data",
"{\"id\":\"job-468\",\"enqueuedAt\":1774458326324}",
"attempts",
"0",
"maxAttempts",
"3",
"seq",
"469",
"timestamp",
"1774458326324",
"orderMs",
"1774458326324",
"score",
"70391126324469",
"delayUntil",
"0"
],
"",
""
],
[
44,
1774458325,
34695,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-0",
"{\"id\":\"job-400\",\"enqueuedAt\":1774458325430}",
"3",
"1774458325430",
"0",
"78c31cb9-4294-4d27-b606-01f3a11ad88d",
"1",
"1774458325430",
"0"
],
"192.168.16.1:52902",
""
],
[
43,
1774458325,
34449,
[
"HSET",
"groupmq:benchmark-1774458318858-vougqhix1g}:job:78c31cb9-4294-4d27-b606-01f3a11ad88d",
"status",
"waiting"
],
"",
""
],
[
42,
1774458324,
41295,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-48",
"{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}",
"3",
"1774458324060",
"0",
"1258b4c8-fbb8-425f-99ee-297e9a8398bb",
"1",
"1774458324060",
"0"
],
"192.168.16.1:52902",
""
],
[
41,
1774458324,
40960,
[
"HMSET",
"groupmq:benchmark-1774458318858-vougqhix1g}:job:1258b4c8-fbb8-425f-99ee-297e9a8398bb",
"id",
"1258b4c8-fbb8-425f-99ee-297e9a8398bb",
"groupId",
"group-48",
"data",
"{\"id\":\"job-298\",\"enqueuedAt\":1774458324060}",
"attempts",
"0",
"maxAttempts",
"3",
"seq",
"299",
"timestamp",
"1774458324060",
"orderMs",
"1774458324060",
"score",
"70391124060299",
"delayUntil",
"0"
],
"",
""
],
[
40,
1774458323,
28743,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-22",
"{\"id\":\"job-272\",\"enqueuedAt\":1774458323834}",
"3",
"1774458323834",
"0",
"560f0441-99e5-4479-a327-be7ec1870b68",
"1",
"1774458323834",
"0"
],
"192.168.16.1:52902",
""
],
[
39,
1774458323,
28299,
[
"SET",
"groupmq:benchmark-1774458318858-vougqhix1g}:unique:560f0441-99e5-4479-a327-be7ec1870b68",
"560f0441-99e5-4479-a327-be7ec1870b68",
"NX"
],
"",
""
],
[
38,
1774458323,
29830,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-9",
"{\"id\":\"job-259\",\"enqueuedAt\":1774458323674}",
"3",
"1774458323674",
"0",
"c2234003-3a45-4d11-9885-15010e8a6afe",
"1",
"1774458323674",
"0"
],
"192.168.16.1:52902",
""
],
[
37,
1774458323,
29407,
[
"ZADD",
"groupmq:benchmark-1774458318858-vougqhix1g}:g:group-9",
"70391123674260",
"c2234003-3a45-4d11-9885-15010e8a6afe"
],
"",
""
],
[
36,
1774458322,
40810,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-0",
"{\"id\":\"job-200\",\"enqueuedAt\":1774458322813}",
"3",
"1774458322813",
"0",
"e7bd56ae-4261-448c-a034-d0139b7e11e2",
"1",
"1774458322813",
"0"
],
"192.168.16.1:52902",
""
],
[
35,
1774458322,
40525,
[
"INCR",
"groupmq:benchmark-1774458318858-vougqhix1g}:seq:20537"
],
"",
""
],
[
34,
1774458322,
54166,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-49",
"{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}",
"3",
"1774458322058",
"0",
"b03fdb14-d600-42f1-a447-27239219fdb7",
"1",
"1774458322058",
"0"
],
"192.168.16.1:52902",
""
],
[
33,
1774458322,
53923,
[
"HMSET",
"groupmq:benchmark-1774458318858-vougqhix1g}:job:b03fdb14-d600-42f1-a447-27239219fdb7",
"id",
"b03fdb14-d600-42f1-a447-27239219fdb7",
"groupId",
"group-49",
"data",
"{\"id\":\"job-149\",\"enqueuedAt\":1774458322058}",
"attempts",
"0",
"maxAttempts",
"3",
"seq",
"150",
"timestamp",
"1774458322058",
"orderMs",
"1774458322058",
"score",
"70391122058150",
"delayUntil",
"0"
],
"",
""
],
[
32,
1774458321,
14638,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-14",
"{\"id\":\"job-64\",\"enqueuedAt\":1774458321066}",
"3",
"1774458321066",
"0",
"78c3c1ad-18d8-49af-8c54-1aa2a5bf3412",
"1",
"1774458321066",
"0"
],
"192.168.16.1:52902",
""
],
[
31,
1774458320,
55177,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-17",
"{\"id\":\"job-17\",\"enqueuedAt\":1774458320443}",
"3",
"1774458320443",
"0",
"bfe5d77a-674d-4ed7-9345-c6837d21776c",
"1",
"1774458320443",
"0"
],
"192.168.16.1:52902",
""
],
[
30,
1774458320,
54890,
[
"SET",
"groupmq:benchmark-1774458318858-vougqhix1g}:unique:bfe5d77a-674d-4ed7-9345-c6837d21776c",
"bfe5d77a-674d-4ed7-9345-c6837d21776c",
"NX"
],
"",
""
],
[
29,
1774458320,
52438,
[
"evalsha",
"1a7e9e4a6738f51cd9b4ae5ac81f6dc82bb38425",
"1",
"groupmq:benchmark-1774458318858-vougqhix1g}",
"group-0",
"{\"id\":\"job-0\",\"enqueuedAt\":1774458319988}",
"3",
"1774458319989",
"0",
"756beb43-3784-40b5-a4f7-a56b99df7148",
"1",
"1774458319990",
"0"
],
"192.168.16.1:52902",
""
]
],
"latency": [],
"info": {
"used_memory": 2147888,
"used_memory_human": "2.05M",
"used_memory_peak": 2423312,
"used_memory_peak_human": "2.31M",
"total_commands_processed": 27250,
"instantaneous_ops_per_sec": 232,
"total_net_input_bytes": 843278,
"total_net_output_bytes": 265854,
"keyspace_hits": 10518,
"keyspace_misses": 2273
},
"summary": {
"totalCalls": 27245,
"totalUsec": 1320516,
"avgUsecPerCall": 48.46819599926592,
"commandCount": 31,
"topCommands": [
{
"command": "zcard",
"calls": 5615,
"usec": 7020,
"usec_per_call": 1.25,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "hset",
"calls": 2503,
"usec": 44688,
"usec_per_call": 17.85,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "evalsha",
"calls": 2295,
"usec": 807131,
"usec_per_call": 351.69,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "zadd",
"calls": 2198,
"usec": 60115,
"usec_per_call": 27.35,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "zrange",
"calls": 1619,
"usec": 6934,
"usec_per_call": 4.28,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "del",
"calls": 1551,
"usec": 4795,
"usec_per_call": 3.09,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "set",
"calls": 1445,
"usec": 96855,
"usec_per_call": 67.03,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "get",
"calls": 1225,
"usec": 30527,
"usec_per_call": 24.92,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "hget",
"calls": 921,
"usec": 5815,
"usec_per_call": 6.31,
"rejected_calls": 0,
"failed_calls": 0
},
{
"command": "zscore",
"calls": 869,
"usec": 3428,
"usec_per_call": 3.94,
"rejected_calls": 0,
"failed_calls": 0
}
]
}
},
"settings": {
"mq": "groupmq",
"jobs": 500,
"workers": 4,
"jobType": "cpu",
"multiProcess": true
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This benchmark sample looks corrupted.

It reports completedJobs: 501 for totalJobs: 500 and avgCpuPercent: -0.3. The matching benchmark/results/groupmq_redis.json snapshot also records exactly one LREM on an active list, which lines up with the single extra completion. Please regenerate this run only after adding a regression test for “recover ghosts without duplicating live jobs.”

Comment on lines +13 to +16
deploy:
resources:
limits:
memory: 2g
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

In Docker Compose (non-Swarm), is deploy.resources.limits.memoryenforced, or shouldmem_limit be used instead?

💡 Result:

In Docker Compose (non-Swarm mode), deploy.resources.limits.memory is not enforced by default when using docker compose up. It is primarily intended for Swarm mode deployments via docker stack deploy. Instead, use mem_limit directly under the service for memory limits in non-Swarm setups. For modern Docker Compose (v2+, using the Compose Specification without a version key), both mem_limit (service-level) and deploy.resources.limits.memory are supported and enforced in single-node (non-Swarm) mode. Official Compose docs and spec confirm mem_limit maps to container --memory and must align with deploy.limits.memory if both are set. Example for enforcement: services: app: image: nginx mem_limit: 512m # Enforced in non-Swarm or services: app: image: nginx deploy: resources: limits: memory: 512m # Works in modern Compose non-Swarm Legacy v3 files require --compatibility flag or Swarm for deploy keys. Recent sources (2024-2026, e.g., Compose spec, Docker docs) show deploy.resources now works reliably in docker compose (v2) without Swarm.

Citations:


🏁 Script executed:

cat docker-compose.yml

Repository: Openpanel-dev/groupmq

Length of output: 691


deploy.resources.limits is not enforced in non-Swarm mode with Compose v3

This file uses version: "3", a legacy versioned format where deploy.resources.limits.memory is only enforced in Swarm mode. When used with docker compose up (non-Swarm), the 2g memory limit will be ignored, creating a false safety assumption during benchmarking or development.

Suggested fix

Add mem_limit at the service level to enforce the memory constraint in non-Swarm deployments:

 services:
   dragonfly:
     image: docker.dragonflydb.io/dragonflydb/dragonfly:latest
     container_name: groupmq-dragonfly
     restart: always
     ports:
       - "6385:6379"
     ulimits:
       memlock: -1
       nofile: 65535
+    mem_limit: 2g
     deploy:
       resources:
         limits:
           memory: 2g

(Keep the deploy block if this stack may also run in Swarm mode.)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.yml` around lines 13 - 16, The compose file uses
deploy.resources.limits.memory which is ignored by non-Swarm `docker compose
up`; add a service-level memory cap by adding `mem_limit: 2g` under the service
definition (alongside the existing `deploy` block) so the 2g limit is enforced
in non-Swarm mode; keep the existing `deploy.resources.limits.memory: 2g` intact
for Swarm compatibility.

Comment on lines +1806 to +1844
async recoverActiveJobs(): Promise<number> {
const groupsKey = `${this.ns}:groups`;
const readyKey = `${this.ns}:ready`;
const processingKey = `${this.ns}:processing`;
const allGroups = await this.r.smembers(groupsKey);
let recovered = 0;

for (const groupId of allGroups) {
const activeKey = `${this.ns}:g:${groupId}:active`;
const activeJobs = await this.r.lrange(activeKey, 0, -1);
if (activeJobs.length === 0) continue;

for (const jobId of activeJobs) {
const jobKey = `${this.ns}:job:${jobId}`;
const score = await this.r.hget(jobKey, 'score');

await this.r.lrem(activeKey, 0, jobId);
await this.r.zrem(processingKey, jobId);
await this.r.del(`${this.ns}:processing:${jobId}`);

if (score) {
const groupKey = `${this.ns}:g:${groupId}`;
await this.r.zadd(groupKey, Number(score), jobId);
await this.r.hset(jobKey, 'status', 'waiting');

const head = await this.r.zrange(groupKey, 0, 0, 'WITHSCORES');
if (head.length >= 2) {
await this.r.zadd(readyKey, Number(head[1]), groupId);
}

recovered++;
this.logger.info(
`Recovered stale active job ${jobId} from group ${groupId}`,
);
}
}
}
return recovered;
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don't blindly re-queue every active entry.

This method treats every job in :g:<groupId>:active as a ghost. Because src/worker.ts now runs it automatically on startup, a rolling deploy or scale-out can pull a still-heartbeating job out of :processing, push it back into the group queue, and let another worker execute it again. The recovery path needs the same stale-entry checks as the Lua reserve scripts, plus a queue-scoped/distributed one-shot gate before it is safe to auto-run.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/queue.ts` around lines 1806 - 1844, recoverActiveJobs is re-adding every
job in :g:<groupId>:active without checking whether the job is actually stale or
still heartbeating, which can cause live jobs to be re-queued during rolling
deploys; update recoverActiveJobs to only requeue when the same stale checks
used by the Lua reserve logic pass (e.g. inspect
`${this.ns}:processing:${jobId}` heartbeat/timestamp or a processing field on
`${this.ns}:job:${jobId}` and require it to be older than a configured
staleThreshold) and obtain a cluster-wide one-shot recovery gate before
proceeding (use a SETNX/expire lock key like `${this.ns}:recovery:lock` or
per-queue recovery key to ensure only one node performs recovery at a time);
modify the code that currently lrem/zrem/zadd jobs so those mutations only
happen after the stale check succeeds and ensure you update job status/hashes
the same way as the Lua reserve path (use recoverActiveJobs, activeKey,
processingKey, jobKey identifiers to locate the logic).

Comment on lines +115 to +177
function simulateCtrlC(worker: Worker) {
// @ts-ignore
if (worker['blockingClient']) {
worker['blockingClient'].disconnect();
worker['blockingClient'] = null;
}
// @ts-ignore
worker['stopping'] = true;
// @ts-ignore
worker['closed'] = true;
// @ts-ignore
if (worker['cleanupTimer']) clearInterval(worker['cleanupTimer']);
// @ts-ignore
if (worker['schedulerTimer']) clearInterval(worker['schedulerTimer']);
// @ts-ignore
if (worker['stalledCheckTimer']) clearInterval(worker['stalledCheckTimer']);
// @ts-ignore
worker['jobsInProgress'].clear();
}

async function setupGhost(redis: Redis, namespace: string) {
const ns = `groupmq:${namespace}`;
const queue = new Queue({
redis: redis.duplicate(),
namespace,
jobTimeoutMs: 7 * 24 * 60 * 60 * 1000,
orderingDelayMs: 1000,
});

const job1 = await queue.add({
groupId: 'timeout',
data: { msg: 'job-1' },
orderMs: Date.now() + 100,
});

await new Promise((r) => setTimeout(r, 1500));
await queue.runSchedulerOnce();

await queue.add({
groupId: 'timeout',
data: { msg: 'job-2' },
orderMs: Date.now() + 200,
});

await new Promise((r) => setTimeout(r, 1500));
await queue.runSchedulerOnce();

let pickedUpResolve: () => void;
const pickedUp = new Promise<void>((r) => { pickedUpResolve = r; });

const workerRef = new Worker({
queue,
handler: async (_job) => {
pickedUpResolve();
await new Promise(() => {});
},
concurrency: 1,
});

await pickedUp;
simulateCtrlC(workerRef);

return { queue, ns, job1 };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This setup never creates a real ghost-active job.

simulateCtrlC() only flips worker fields; it does not stop the in-flight processSingleJob() promise or its local heartbeatDelayTimer / hbTimer. After ~2s the original worker can still heartbeat job1, so this suite is really validating that startup recovery steals a live job, and the new Lua self-healing branches never execute. Build the leaked reservation directly instead, and add a separate case that exercises reserve-time self-healing without recoverActiveJobs() in front of it.

Possible direction
-  let pickedUpResolve: () => void;
-  const pickedUp = new Promise<void>((r) => { pickedUpResolve = r; });
-
-  const workerRef = new Worker({
-    queue,
-    handler: async (_job) => {
-      pickedUpResolve();
-      await new Promise(() => {});
-    },
-    concurrency: 1,
-  });
-
-  await pickedUp;
-  simulateCtrlC(workerRef);
+  const reserved = await queue.reserve();
+  expect(reserved?.id).toBe(job1.id);

As per coding guidelines, **/*.{js,ts,tsx}: All features should have tests and **/*.test.{js,ts}: All tests should always succeed with no failing tests.

Also applies to: 197-237

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/queue.ghost-active-job.test.ts` around lines 115 - 177, The test's
simulateCtrlC/setupGhost flow never creates a real "ghost-active" reservation
because simulateCtrlC only flips Worker internals (simulateCtrlC) but doesn't
cancel the in-flight processSingleJob promise or its local heartbeat timers
(heartbeatDelayTimer/hbTimer), so the original worker continues heartbeating
job1; to fix, change setupGhost to build a leaked reservation directly (create
the Redis reservation/lock for job1 as if the original worker died) instead of
trying to fake a crash with simulateCtrlC, and add a second test case that
specifically exercises reserve-time self-healing by invoking the code path
without calling recoverActiveJobs() first — look for functions/fields:
Worker/processSingleJob, heartbeatDelayTimer, hbTimer, recoverActiveJobs,
simulateCtrlC, and the test helper setupGhost to implement the leaked
reservation and the additional test scenario.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant