diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 0bd745e..130587d 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -14,6 +14,8 @@ use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Queue; +use Utopia\Queue\Result\Commit; +use Utopia\Queue\Result\NoCommit; class AMQP implements Publisher, Consumer { @@ -81,8 +83,12 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; $message = new Message($nextMessage); - $messageCallback($message); - $amqpMessage->ack(); + $result = $messageCallback($message); + match (true) { + $result instanceof Commit => $amqpMessage->ack(true), + $result instanceof NoCommit => null, + default => $amqpMessage->ack() + }; $successCallback($message); } catch (Retryable $e) { $amqpMessage->nack(requeue: true); diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php index 8c55c6b..7f130cd 100644 --- a/src/Queue/Consumer.php +++ b/src/Queue/Consumer.php @@ -2,11 +2,14 @@ namespace Utopia\Queue; +use Utopia\Queue\Result\Commit; +use Utopia\Queue\Result\NoCommit; + interface Consumer { /** * @param Queue $queue - * @param callable(Message $message): void $messageCallback + * @param callable(Message $message): Commit|NoCommit|mixed $messageCallback * @param callable(Message $message): void $successCallback * @param callable(Message $message, \Throwable $th): void $errorCallback * @return void diff --git a/src/Queue/Result/Commit.php b/src/Queue/Result/Commit.php new file mode 100644 index 0000000..9c0a93f --- /dev/null +++ b/src/Queue/Result/Commit.php @@ -0,0 +1,7 @@ +job->getAction(), $this->getArguments($this->job, $message->getPayload())); + return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload())); } finally { $processDuration = microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration);