Skip to content

Commit 56bd895

Browse files
committed
Added the rabbit state
1 parent 38ce28e commit 56bd895

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

src/RabbitState.php

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace Kiboko\Component\Flow\RabbitMQ;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
use Kiboko\Contract\Pipeline\StateInterface;
8+
9+
final class RabbitState implements StateInterface
10+
{
11+
private Client $connection;
12+
private Channel $channel;
13+
private array $metrics;
14+
15+
public function __construct(
16+
private string $host,
17+
private string $user,
18+
private string $password,
19+
private string $topic,
20+
private ?int $port = null,
21+
private ?string $vhost = null,
22+
private ?string $exchange = null,
23+
) {
24+
$this->connection = new Client([
25+
'host' => $this->host,
26+
'vhost' => $this->vhost,
27+
'port' => $this->port,
28+
'user' => $this->user,
29+
'password' => $this->password,
30+
]);
31+
32+
$this->connection->connect();
33+
$this->channel = $this->connection->channel();
34+
$this->channel->queueDeclare($this->topic);
35+
}
36+
37+
public function initialize(int $start = 0): void
38+
{
39+
$this->metrics = [
40+
'accept' => 0,
41+
'reject' => 0,
42+
];
43+
}
44+
45+
public function accept(int $step = 1): void
46+
{
47+
$this->metrics['accept'] += $step;
48+
49+
$this->channel->publish(
50+
\json_encode([
51+
'Line accepted : ' => $this->metrics['accept']
52+
]),
53+
[
54+
'content-type' => 'application/json',
55+
],
56+
$this->exchange,
57+
$this->topic
58+
);
59+
}
60+
61+
public function reject(int $step = 1): void
62+
{
63+
$this->metrics['reject'] += $step;
64+
65+
$this->channel->publish(
66+
\json_encode([
67+
'Line rejected : ' => $this->metrics['reject']
68+
]),
69+
[
70+
'content-type' => 'application/json',
71+
],
72+
$this->exchange,
73+
$this->topic
74+
);
75+
}
76+
77+
public function __destruct()
78+
{
79+
$this->channel->close();
80+
$this->connection->stop();
81+
}
82+
}

0 commit comments

Comments
 (0)