Skip to content

Commit 83d6ac3

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit 83d6ac3

File tree

20 files changed

+350
-245
lines changed

20 files changed

+350
-245
lines changed

pint.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
{
2-
"preset": "psr12"
2+
"preset": "psr12",
3+
"rules": {
4+
"single_quote": true
5+
}
36
}

src/Queue/Adapter.php

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,4 @@ abstract public function workerStart(callable $callback): self;
4040
* @return self
4141
*/
4242
abstract public function workerStop(callable $callback): self;
43-
44-
/**
45-
* Returns the native server object from the Adapter.
46-
* @return mixed
47-
*/
48-
abstract public function getNative(): mixed;
4943
}

src/Queue/Adapter/Swoole.php

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -2,101 +2,99 @@
22

33
namespace Utopia\Queue\Adapter;
44

5-
use Swoole\Constant;
6-
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
5+
use Swoole\Coroutine;
6+
use Swoole\Process;
87
use Utopia\Queue\Adapter;
98
use Utopia\Queue\Consumer;
109

1110
class Swoole extends Adapter
1211
{
13-
protected Pool $pool;
12+
/** @var Process[] */
13+
protected array $workers = [];
1414

15-
/** @var callable */
16-
private $onStop;
15+
/** @var callable[] */
16+
protected array $onWorkerStart = [];
1717

18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
20-
parent::__construct($workerNum, $queue, $namespace);
18+
/** @var callable[] */
19+
protected array $onWorkerStop = [];
2120

21+
public function __construct(
22+
Consumer $consumer,
23+
int $workerNum,
24+
string $queue,
25+
string $namespace = 'utopia-queue',
26+
) {
27+
parent::__construct($workerNum, $queue, $namespace);
2228
$this->consumer = $consumer;
23-
$this->pool = new Pool($workerNum);
2429
}
2530

2631
public function start(): self
2732
{
28-
$this->pool->set(['enable_coroutine' => true]);
33+
for ($i = 0; $i < $this->workerNum; $i++) {
34+
$this->spawnWorker($i);
35+
}
2936

30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
37+
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
3638

37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
39+
Coroutine\run(function () {
40+
Process::signal(SIGTERM, fn () => $this->stop());
41+
Process::signal(SIGINT, fn () => $this->stop());
42+
Process::signal(SIGCHLD, fn () => $this->reap());
4143

42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
44+
while (\count($this->workers) > 0) {
45+
Coroutine::sleep(1);
46+
}
47+
});
4748

48-
$this->pool->start();
4949
return $this;
5050
}
5151

52-
public function stop(): self
52+
protected function spawnWorker(int $workerId): void
5353
{
54-
if ($this->onStop) {
55-
call_user_func($this->onStop);
56-
}
54+
$process = new Process(function () use ($workerId) {
55+
Coroutine::set(['hook_flags' => SWOOLE_HOOK_ALL]);
5756

58-
Console::info("[Swoole] Shutting down process pool...");
59-
$this->pool->shutdown();
60-
Console::success("[Swoole] Process pool stopped.");
61-
return $this;
57+
Coroutine\run(function () use ($workerId) {
58+
Process::signal(SIGTERM, fn () => $this->consumer->close());
59+
60+
foreach ($this->onWorkerStart as $callback) {
61+
$callback((string)$workerId);
62+
}
63+
64+
foreach ($this->onWorkerStop as $callback) {
65+
$callback((string)$workerId);
66+
}
67+
});
68+
}, false, 0, false);
69+
70+
$pid = $process->start();
71+
$this->workers[$pid] = $process;
6272
}
6373

64-
public function workerStart(callable $callback): self
74+
protected function reap(): void
6575
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
83-
});
76+
while (($ret = Process::wait(false)) !== false) {
77+
unset($this->workers[$ret['pid']]);
78+
}
79+
}
8480

81+
public function stop(): self
82+
{
83+
foreach ($this->workers as $pid => $process) {
84+
Process::kill($pid, SIGTERM);
85+
}
8586
return $this;
8687
}
8788

88-
public function workerStop(callable $callback): self
89+
public function workerStart(callable $callback): self
8990
{
90-
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
93-
});
94-
91+
$this->onWorkerStart[] = $callback;
9592
return $this;
9693
}
9794

98-
public function getNative(): Pool
95+
public function workerStop(callable $callback): self
9996
{
100-
return $this->pool;
97+
$this->onWorkerStop[] = $callback;
98+
return $this;
10199
}
102100
}

src/Queue/Adapter/Workerman.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,4 @@ public function workerStop(callable $callback): self
4747

4848
return $this;
4949
}
50-
51-
public function getNative(): Worker
52-
{
53-
return $this->worker;
54-
}
5550
}

src/Queue/Broker/AMQP.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
111111
$channel->exchange_declare("{$queue->namespace}.failed", AMQPExchangeType::TOPIC, durable: true, auto_delete: false, arguments: new AMQPTable($this->exchangeArguments));
112112

113113
// 2. Declare the working queue and configure the DLX for receiving rejected messages.
114-
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ["x-dead-letter-exchange" => "{$queue->namespace}.failed"])));
114+
$channel->queue_declare($queue->name, durable: true, auto_delete: false, arguments: new AMQPTable(array_merge($this->queueArguments, ['x-dead-letter-exchange' => "{$queue->namespace}.failed"])));
115115
$channel->queue_bind($queue->name, $queue->namespace, routing_key: $queue->name);
116116

117117
// 3. Declare the dead-letter-queue and bind it to the DLX.
@@ -162,7 +162,7 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
162162
{
163163
$queueName = $queue->name;
164164
if ($failedJobs) {
165-
$queueName = $queueName . ".failed";
165+
$queueName = $queueName . '.failed';
166166
}
167167

168168
$client = new Client();

src/Queue/Broker/Pool.php

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

3842
public function close(): void
3943
{
40-
$this->delegateConsumer(__FUNCTION__, \func_get_args());
44+
// TODO: Implement closing all connections in the pool
4145
}
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

src/Queue/Broker/Redis.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
class Redis implements Publisher, Consumer
1212
{
13+
private const int POP_TIMEOUT = 2;
14+
1315
private bool $closed = false;
1416

1517
public function __construct(private readonly Connection $connection)
@@ -22,7 +24,15 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
2224
/**
2325
* Waiting for next Job.
2426
*/
25-
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", 5);
27+
try {
28+
$nextMessage = $this->connection->rightPopArray("{$queue->namespace}.queue.{$queue->name}", self::POP_TIMEOUT);
29+
} catch (\RedisException $e) {
30+
if ($this->closed) {
31+
break;
32+
}
33+
34+
throw $e;
35+
}
2636

2737
if (!$nextMessage) {
2838
continue;
@@ -115,7 +125,7 @@ public function retry(Queue $queue, ?int $limit = null): void
115125
$processed = 0;
116126

117127
while (true) {
118-
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", 5);
128+
$pid = $this->connection->rightPop("{$queue->namespace}.failed.{$queue->name}", self::POP_TIMEOUT);
119129

120130
// No more jobs to retry
121131
if ($pid === false) {

src/Queue/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;
28+
public function close(): void;
2829
}

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ public function ping(): bool
169169
}
170170
}
171171

172+
public function close(): void
173+
{
174+
$this->redis?->close();
175+
$this->redis = null;
176+
}
177+
172178
protected function getRedis(): \Redis
173179
{
174180
if ($this->redis) {

src/Queue/Connection/RedisCluster.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ public function ping(): bool
166166
}
167167
}
168168

169+
public function close(): void
170+
{
171+
$this->redis?->close();
172+
$this->redis = null;
173+
}
174+
169175
protected function getRedis(): \RedisCluster
170176
{
171177
if ($this->redis) {

0 commit comments

Comments
 (0)