Skip to content

Commit c5bc23f

Browse files
committed
Updated the state interface
1 parent 894c749 commit c5bc23f

File tree

2 files changed

+90
-33
lines changed

2 files changed

+90
-33
lines changed

src/Rejection.php

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,38 +8,64 @@
88

99
final class Rejection implements RejectionInterface
1010
{
11-
private Client $connection;
1211
private Channel $channel;
1312

1413
public function __construct(
15-
string $host,
16-
string $vhost,
14+
Client $connection,
15+
private string $stepUuid,
1716
private string $topic,
18-
?string $user = 'guest',
19-
?string $password = 'guest',
2017
private ?string $exchange = null,
21-
?int $port = null,
2218
) {
23-
$this->connection = new Client([
19+
$this->channel = $connection->channel();
20+
$this->channel->queueDeclare(
21+
queue: $this->topic,
22+
passive: false,
23+
durable: true,
24+
exclusive: false,
25+
autoDelete: true,
26+
);
27+
}
28+
29+
public static function withoutAuthentication(
30+
string $stepUuid,
31+
string $host,
32+
string $vhost,
33+
string $topic,
34+
?string $exchange = null,
35+
?int $port = null,
36+
): self {
37+
$connection = new Client([
2438
'host' => $host,
2539
'port' => $port,
2640
'vhost' => $vhost,
27-
'user' => $user,
28-
'password' => $password,
41+
'user' => 'guest',
42+
'password' => 'guest',
2943
]);
30-
$this->connection->connect();
44+
$connection->connect();
3145

32-
$this->channel = $this->connection->channel();
46+
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
3347
}
3448

35-
public static function withoutAuthentication(
49+
public static function withAuthentication(
50+
string $stepUuid,
3651
string $host,
3752
string $vhost,
3853
string $topic,
54+
?string $user,
55+
?string $password,
3956
?string $exchange = null,
4057
?int $port = null,
4158
): self {
42-
return new self(host: $host, vhost: $vhost, topic: $topic, exchange: $exchange, port: $port);
59+
$connection = new Client([
60+
'host' => $host,
61+
'port' => $port,
62+
'vhost' => $vhost,
63+
'user' => $user,
64+
'password' => $password,
65+
]);
66+
$connection->connect();
67+
68+
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
4369
}
4470

4571
public function reject(object|array $rejection, ?\Throwable $exception = null): void
@@ -48,6 +74,7 @@ public function reject(object|array $rejection, ?\Throwable $exception = null):
4874
\json_encode([
4975
'item' => $rejection,
5076
'exception' => $exception,
77+
'step' => $this->stepUuid,
5178
]),
5279
[
5380
'content-type' => 'application/json',
Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,40 +6,70 @@
66
use Bunny\Client;
77
use Kiboko\Contract\Pipeline\StateInterface;
88

9-
final class RabbitState implements StateInterface
9+
final class State implements StateInterface
1010
{
11-
private Client $connection;
1211
private Channel $channel;
13-
private \DateTimeInterface $date;
14-
private string $stepUuid;
1512

1613
public function __construct(
17-
private string $host,
18-
private string $user,
19-
private string $password,
14+
Client $connection,
15+
private string $stepUuid,
2016
private string $topic,
21-
private ?int $port = null,
22-
private ?string $vhost = null,
2317
private ?string $exchange = null,
2418
) {
25-
$this->date = new \DateTime();
26-
$this->stepUuid = $this->generateRandomUuid();
19+
$this->channel = $connection->channel();
20+
}
21+
22+
public static function withoutAuthentication(
23+
string $stepUuid,
24+
string $host,
25+
string $vhost,
26+
string $topic,
27+
?string $exchange = null,
28+
?int $port = null,
29+
): self {
30+
$connection = new Client([
31+
'host' => $host,
32+
'port' => $port,
33+
'vhost' => $vhost,
34+
'user' => 'guest',
35+
'password' => 'guest',
36+
]);
37+
$connection->connect();
2738

28-
$this->connection = new Client([
29-
'host' => $this->host,
30-
'vhost' => $this->vhost,
31-
'port' => $this->port,
32-
'user' => $this->user,
33-
'password' => $this->password,
39+
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
40+
}
41+
42+
public static function withAuthentication(
43+
string $stepUuid,
44+
string $host,
45+
string $vhost,
46+
string $topic,
47+
?string $user,
48+
?string $password,
49+
?string $exchange = null,
50+
?int $port = null,
51+
): self {
52+
$connection = new Client([
53+
'host' => $host,
54+
'port' => $port,
55+
'vhost' => $vhost,
56+
'user' => $user,
57+
'password' => $password,
3458
]);
59+
$connection->connect();
3560

36-
$this->connection->connect();
37-
$this->channel = $this->connection->channel();
38-
$this->channel->queueDeclare($this->topic);
61+
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
3962
}
4063

4164
public function initialize(int $start = 0): void
4265
{
66+
$this->channel->queueDeclare(
67+
queue: $this->topic,
68+
passive: false,
69+
durable: true,
70+
exclusive: false,
71+
autoDelete: true,
72+
);
4373
}
4474

4575
public function accept(int $step = 1): void

0 commit comments

Comments
 (0)