diff --git a/.github/workflows/phpstan.yml b/.github/workflows/phpstan.yml new file mode 100644 index 0000000..a8874e4 --- /dev/null +++ b/.github/workflows/phpstan.yml @@ -0,0 +1,20 @@ +name: "CodeQL" + +on: [ pull_request ] +jobs: + lint: + name: CodeQL + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 2 + + - run: git checkout HEAD^2 + + - name: Run CodeQL + run: | + docker run --rm -v $PWD:/app composer sh -c \ + "composer install --profile --ignore-platform-reqs && composer check" \ No newline at end of file diff --git a/.github/workflows/linter.yml b/.github/workflows/pint.yml similarity index 92% rename from .github/workflows/linter.yml rename to .github/workflows/pint.yml index 9e898b6..26a4907 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/pint.yml @@ -9,7 +9,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 2 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..6391332 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,35 @@ +name: "Tests" + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +on: [pull_request] + +jobs: + adapter_test: + name: Adapter Tests + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + adapter: + [ + AMQP, + SwooleRedisCluster, + Swoole, + Workerman, + ] + + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Load and Start Services + run: | + docker compose build + docker compose up -d + sleep 10 + + - name: Run Tests + run: docker compose exec -T tests vendor/bin/phpunit /usr/local/src/tests/Queue/E2E/Adapter/${{matrix.adapter}}Test.php --debug diff --git a/Dockerfile b/Dockerfile index 741f3d3..94354a4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,4 +19,4 @@ COPY . . COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor -CMD ["sleep","3600"] +CMD ["tail", "-f", "/dev/null"] diff --git a/composer.json b/composer.json index fc6b7ed..6252380 100644 --- a/composer.json +++ b/composer.json @@ -19,7 +19,7 @@ }, "scripts":{ "test": "phpunit", - "analyse": "vendor/bin/phpstan analyse", + "check": "vendor/bin/phpstan analyse", "format": "vendor/bin/pint", "lint": "vendor/bin/pint --test" }, diff --git a/docker-compose.yml b/docker-compose.yml index 09c2320..5db1285 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,10 @@ -version: '3.1' - services: tests: container_name: tests build: . - command: - - vendor/bin/phpunit - - tests volumes: - - ./:/usr/local/src + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests depends_on: - swoole - swoole-amqp @@ -20,7 +16,8 @@ services: build: ./tests/Queue/servers/Swoole/. command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php volumes: - - ./:/usr/src/code + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests depends_on: - redis @@ -29,7 +26,8 @@ services: build: ./tests/Queue/servers/SwooleRedisCluster/. command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php volumes: - - ./:/usr/src/code + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests depends_on: redis-cluster-0: condition: service_healthy @@ -39,7 +37,8 @@ services: build: ./tests/Queue/servers/AMQP/. command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php volumes: - - ./:/usr/src/code + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests depends_on: amqp: condition: service_healthy @@ -49,7 +48,8 @@ services: build: ./tests/Queue/servers/Workerman/. command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start volumes: - - ./:/usr/src/code + - ./src:/usr/local/src/src + - ./tests:/usr/local/src/tests depends_on: - redis @@ -115,5 +115,5 @@ services: RABBITMQ_DEFAULT_PASS: amqp RABBITMQ_DEFAULT_VHOST: "/" healthcheck: - test: [ "CMD", "rabbitmqctl", "node_health_check"] + test: [ "CMD", "rabbitmqctl", "node_health_check" ] start_interval: 1s \ No newline at end of file diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 130587d..0f78354 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -76,7 +76,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe try { $nextMessage = json_decode($amqpMessage->getBody(), associative: true) ?? false; if (!$nextMessage) { - $amqpMessage->nack(requeue: false); + $amqpMessage->nack(); return; } @@ -92,10 +92,10 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe $successCallback($message); } catch (Retryable $e) { $amqpMessage->nack(requeue: true); - $errorCallback($message, $e); + $errorCallback($message ?? null, $e); } catch (\Throwable $th) { $amqpMessage->nack(requeue: false); - $errorCallback($message, $th); + $errorCallback($message ?? null, $th); } }; @@ -149,7 +149,7 @@ public function enqueue(Queue $queue, array $payload): bool return true; } - public function retry(Queue $queue, int $limit = null): void + public function retry(Queue $queue, ?int $limit = null): void { // This is a no-op for AMQP } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index 34ee22e..0e3beb7 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -109,7 +109,7 @@ public function enqueue(Queue $queue, array $payload): bool * Take all jobs from the failed queue and re-enqueue them. * @param int|null $limit The amount of jobs to retry */ - public function retry(Queue $queue, int $limit = null): void + public function retry(Queue $queue, ?int $limit = null): void { $start = \time(); $processed = 0; diff --git a/src/Queue/Connection/RedisCluster.php b/src/Queue/Connection/RedisCluster.php index 3df0eb7..50a2b46 100644 --- a/src/Queue/Connection/RedisCluster.php +++ b/src/Queue/Connection/RedisCluster.php @@ -110,7 +110,8 @@ public function remove(string $key): bool public function move(string $queue, string $destination): bool { - return $this->getRedis()->move($queue, $destination); + // Move is not supported for Redis Cluster + return false; } public function setArray(string $key, array $value): bool @@ -160,7 +161,7 @@ public function ping(): bool } return true; - } catch (Exception $e) { + } catch (\Throwable) { return false; } } diff --git a/src/Queue/Consumer.php b/src/Queue/Consumer.php index 7f130cd..c71ce6e 100644 --- a/src/Queue/Consumer.php +++ b/src/Queue/Consumer.php @@ -9,15 +9,20 @@ interface Consumer { /** * @param Queue $queue - * @param callable(Message $message): Commit|NoCommit|mixed $messageCallback + * @param callable(Message $message): (Commit|NoCommit|mixed) $messageCallback * @param callable(Message $message): void $successCallback * @param callable(Message $message, \Throwable $th): void $errorCallback * @return void */ - public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void; + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback + ): void; /** - * Closes the consumer and free's any underlying resources. + * Closes the consumer and frees any underlying resources. */ public function close(): void; } diff --git a/src/Queue/Publisher.php b/src/Queue/Publisher.php index da07481..1778656 100644 --- a/src/Queue/Publisher.php +++ b/src/Queue/Publisher.php @@ -20,7 +20,7 @@ public function enqueue(Queue $queue, array $payload): bool; * @param int|null $limit * @return void */ - public function retry(Queue $queue, int $limit = null): void; + public function retry(Queue $queue, ?int $limit = null): void; /** * Returns the amount of pending messages in the queue. diff --git a/src/Queue/Server.php b/src/Queue/Server.php index a8ab1f8..a465676 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -50,8 +50,6 @@ class Server /** * Hook that is called when worker starts - * - * @var Hook */ protected Hook $workerStartHook; @@ -273,16 +271,16 @@ function (Message $message) { } } } - Console::success("[Job] ({$message->getPid()}) successfully run."); }, - function (Message $message, Throwable $th) { - Console::error("[Job] ({$message->getPid()}) failed to run."); - Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}"); + function (?Message $message, Throwable $th) { + Console::error("[Job] ({$message?->getPid()}) failed to run."); + Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}"); self::setResource('error', fn () => $th); + foreach ($this->errorHooks as $hook) { - call_user_func_array($hook->getAction(), $this->getArguments($hook)); + ($hook->getAction())(...$this->getArguments($hook)); } }, ); @@ -322,10 +320,11 @@ public function getWorkerStart(): Hook /** * Is called when a Worker stops. - * @param callable $callback + * @param callable|null $callback * @return self + * @throws Exception */ - public function workerStop(callable $callback = null): self + public function workerStop(?callable $callback = null): self { try { $this->adapter->workerStop(function (string $workerId) use ($callback) { diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index 3181486..e507d0d 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -72,7 +72,7 @@ public function testEvents(): void sleep(1); } - protected function testConcurrency(): void + public function testConcurrency(): void { run(function () { $publisher = $this->getPublisher(); @@ -93,23 +93,34 @@ public function testRetry(): void { $publisher = $this->getPublisher(); - $publisher->enqueue($this->getQueue(), [ + $published = $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 1 ]); - $publisher->enqueue($this->getQueue(), [ + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 2 ]); - $publisher->enqueue($this->getQueue(), [ + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 3 ]); - $publisher->enqueue($this->getQueue(), [ + + $this->assertTrue($published); + + $published = $publisher->enqueue($this->getQueue(), [ 'type' => 'test_exception', 'id' => 4 ]); + $this->assertTrue($published); + sleep(1); $publisher->retry($this->getQueue()); sleep(1);