Skip to content

Commit 3a7603e

Browse files
Merge pull request #49 from utopia-php/revert-45-PLA-2762-2
Revert "feat: add workerStop handling"
2 parents 3c1daeb + e8aa0fb commit 3a7603e

File tree

3 files changed

+77
-61
lines changed

3 files changed

+77
-61
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Utopia\Queue\Adapter;
44

5-
use Swoole\Constant;
65
use Swoole\Process\Pool;
76
use Utopia\Queue\Adapter;
87
use Utopia\Queue\Consumer;
@@ -11,9 +10,6 @@ class Swoole extends Adapter
1110
{
1211
protected Pool $pool;
1312

14-
/** @var callable */
15-
private $onStop;
16-
1713
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
1814
{
1915
parent::__construct($workerNum, $queue, $namespace);
@@ -31,16 +27,13 @@ public function start(): self
3127

3228
public function stop(): self
3329
{
34-
if ($this->onStop) {
35-
call_user_func($this->onStop);
36-
}
3730
$this->pool->shutdown();
3831
return $this;
3932
}
4033

4134
public function workerStart(callable $callback): self
4235
{
43-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
36+
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
4437
call_user_func($callback, $workerId);
4538
});
4639

@@ -49,8 +42,7 @@ public function workerStart(callable $callback): self
4942

5043
public function workerStop(callable $callback): self
5144
{
52-
$this->onStop = $callback;
53-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
45+
$this->pool->on('WorkerStart', function (Pool $pool, string $workerId) use ($callback) {
5446
call_user_func($callback, $workerId);
5547
});
5648

src/Queue/Broker/AMQP.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
131131

132132
public function close(): void
133133
{
134-
$this->channel?->stopConsume();
135134
$this->channel?->getConnection()?->close();
136135
}
137136

src/Queue/Server.php

Lines changed: 75 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -217,19 +217,45 @@ public function start(): self
217217
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
218218
}
219219

220-
$this->adapter->consumer->consume(
221-
$this->adapter->queue,
222-
function (Message $message) {
223-
$receivedAtTimestamp = microtime(true);
224-
Console::info("[Job] Received Job ({$message->getPid()}).");
225-
try {
226-
$waitDuration = microtime(true) - $message->getTimestamp();
227-
$this->jobWaitTime->record($waitDuration);
228-
229-
$this->resources = [];
230-
self::setResource('message', fn () => $message);
220+
while (true) {
221+
$this->adapter->consumer->consume(
222+
$this->adapter->queue,
223+
function (Message $message) {
224+
$receivedAtTimestamp = microtime(true);
225+
Console::info("[Job] Received Job ({$message->getPid()}).");
226+
try {
227+
$waitDuration = microtime(true) - $message->getTimestamp();
228+
$this->jobWaitTime->record($waitDuration);
229+
230+
$this->resources = [];
231+
self::setResource('message', fn () => $message);
232+
if ($this->job->getHook()) {
233+
foreach ($this->initHooks as $hook) { // Global init hooks
234+
if (in_array('*', $hook->getGroups())) {
235+
$arguments = $this->getArguments($hook, $message->getPayload());
236+
\call_user_func_array($hook->getAction(), $arguments);
237+
}
238+
}
239+
}
240+
241+
foreach ($this->job->getGroups() as $group) {
242+
foreach ($this->initHooks as $hook) { // Group init hooks
243+
if (in_array($group, $hook->getGroups())) {
244+
$arguments = $this->getArguments($hook, $message->getPayload());
245+
\call_user_func_array($hook->getAction(), $arguments);
246+
}
247+
}
248+
}
249+
250+
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
251+
} finally {
252+
$processDuration = microtime(true) - $receivedAtTimestamp;
253+
$this->processDuration->record($processDuration);
254+
}
255+
},
256+
function (Message $message) {
231257
if ($this->job->getHook()) {
232-
foreach ($this->initHooks as $hook) { // Global init hooks
258+
foreach ($this->shutdownHooks as $hook) { // Global init hooks
233259
if (in_array('*', $hook->getGroups())) {
234260
$arguments = $this->getArguments($hook, $message->getPayload());
235261
\call_user_func_array($hook->getAction(), $arguments);
@@ -238,55 +264,29 @@ function (Message $message) {
238264
}
239265

240266
foreach ($this->job->getGroups() as $group) {
241-
foreach ($this->initHooks as $hook) { // Group init hooks
267+
foreach ($this->shutdownHooks as $hook) { // Group init hooks
242268
if (in_array($group, $hook->getGroups())) {
243269
$arguments = $this->getArguments($hook, $message->getPayload());
244270
\call_user_func_array($hook->getAction(), $arguments);
245271
}
246272
}
247273
}
274+
Console::success("[Job] ({$message->getPid()}) successfully run.");
275+
},
276+
function (?Message $message, Throwable $th) {
277+
Console::error("[Job] ({$message?->getPid()}) failed to run.");
278+
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
248279

249-
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
250-
} finally {
251-
$processDuration = microtime(true) - $receivedAtTimestamp;
252-
$this->processDuration->record($processDuration);
253-
}
254-
},
255-
function (Message $message) {
256-
if ($this->job->getHook()) {
257-
foreach ($this->shutdownHooks as $hook) { // Global init hooks
258-
if (in_array('*', $hook->getGroups())) {
259-
$arguments = $this->getArguments($hook, $message->getPayload());
260-
\call_user_func_array($hook->getAction(), $arguments);
261-
}
262-
}
263-
}
280+
self::setResource('error', fn () => $th);
264281

265-
foreach ($this->job->getGroups() as $group) {
266-
foreach ($this->shutdownHooks as $hook) { // Group init hooks
267-
if (in_array($group, $hook->getGroups())) {
268-
$arguments = $this->getArguments($hook, $message->getPayload());
269-
\call_user_func_array($hook->getAction(), $arguments);
270-
}
282+
foreach ($this->errorHooks as $hook) {
283+
($hook->getAction())(...$this->getArguments($hook));
271284
}
272-
}
273-
Console::success("[Job] ({$message->getPid()}) successfully run.");
274-
},
275-
function (?Message $message, Throwable $th) {
276-
Console::error("[Job] ({$message?->getPid()}) failed to run.");
277-
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");
278-
279-
self::setResource('error', fn () => $th);
280-
281-
foreach ($this->errorHooks as $hook) {
282-
($hook->getAction())(...$this->getArguments($hook));
283-
}
284-
},
285-
);
285+
},
286+
);
287+
}
286288
});
287289

288-
$this->adapter->workerStop(fn () => $this->adapter->consumer->close());
289-
290290
$this->adapter->start();
291291
} catch (Throwable $error) {
292292
self::setResource('error', fn () => $error);
@@ -318,6 +318,31 @@ public function getWorkerStart(): Hook
318318
return $this->workerStartHook;
319319
}
320320

321+
/**
322+
* Is called when a Worker stops.
323+
* @param callable|null $callback
324+
* @return self
325+
* @throws Exception
326+
*/
327+
public function workerStop(?callable $callback = null): self
328+
{
329+
try {
330+
$this->adapter->workerStop(function (string $workerId) use ($callback) {
331+
Console::success("[Worker] Worker {$workerId} is ready!");
332+
if (!is_null($callback)) {
333+
call_user_func($callback);
334+
}
335+
});
336+
} catch (Throwable $error) {
337+
self::setResource('error', fn () => $error);
338+
foreach ($this->errorHooks as $hook) {
339+
call_user_func_array($hook->getAction(), $this->getArguments($hook));
340+
}
341+
}
342+
343+
return $this;
344+
}
345+
321346
/**
322347
* Get Arguments
323348
*

0 commit comments

Comments
 (0)