Skip to content

Commit 128605d

Browse files
committed
Added a StateManager
1 parent 9af890d commit 128605d

File tree

2 files changed

+133
-117
lines changed

2 files changed

+133
-117
lines changed

src/State.php

Lines changed: 46 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -2,157 +2,86 @@
22

33
namespace Kiboko\Component\Flow\RabbitMQ;
44

5-
use Bunny\Channel;
65
use Bunny\Client;
76
use Kiboko\Contract\Pipeline\StateInterface;
87

98
final class State implements StateInterface
109
{
11-
private Channel $channel;
12-
private array $metrics = [];
10+
private int $acceptMetric = 0;
11+
private int $rejectMetric = 0;
12+
private int $errorMetric = 0;
1313

1414
public function __construct(
15-
private Client $connection,
16-
private string $pipelineId,
15+
private StateManager $manager,
1716
private string $stepCode,
1817
private string $stepLabel,
19-
private string $topic,
20-
private ?int $messageLimit = 1,
21-
private ?string $exchange = null,
2218
) {
23-
$this->channel = $this->connection->channel();
24-
}
25-
26-
public static function withoutAuthentication(
27-
string $pipelineId,
28-
string $stepCode,
29-
string $stepLabel,
30-
string $host,
31-
string $vhost,
32-
string $topic,
33-
?int $messageLimit = 1,
34-
?string $exchange = null,
35-
?int $port = null,
36-
): self {
37-
$connection = new Client([
38-
'host' => $host,
39-
'port' => $port,
40-
'vhost' => $vhost,
41-
'user' => 'guest',
42-
'password' => 'guest',
43-
]);
44-
$connection->connect();
45-
46-
return new self($connection, pipelineId: $pipelineId, stepCode: $stepCode, stepLabel: $stepLabel, topic: $topic, messageLimit: $messageLimit, exchange: $exchange);
47-
}
48-
49-
public static function withAuthentication(
50-
string $pipelineId,
51-
string $stepCode,
52-
string $stepLabel,
53-
string $host,
54-
string $vhost,
55-
string $topic,
56-
?string $user,
57-
?string $password,
58-
?int $messageLimit = 1,
59-
?string $exchange = null,
60-
?int $port = null,
61-
): self {
62-
$connection = new Client([
63-
'host' => $host,
64-
'port' => $port,
65-
'vhost' => $vhost,
66-
'user' => $user,
67-
'password' => $password,
68-
]);
69-
$connection->connect();
70-
71-
return new self($connection, pipelineId: $pipelineId, stepCode: $stepCode, stepLabel: $stepLabel, topic: $topic, messageLimit: $messageLimit, exchange: $exchange);
7219
}
7320

7421
public function initialize(): void
7522
{
76-
$this->metrics = [
77-
'accept' => 0,
78-
'reject' => 0,
79-
'error' => 0,
80-
];
81-
82-
$this->channel->queueDeclare(
83-
queue: $this->topic,
84-
passive: false,
85-
durable: true,
86-
exclusive: false,
87-
autoDelete: true,
88-
);
23+
$this->acceptMetric = 0;
24+
$this->rejectMetric = 0;
25+
$this->errorMetric = 0;
8926
}
9027

9128
public function accept(int $step = 1): void
9229
{
93-
$this->metrics['accept'] += $step;
30+
$this->acceptMetric += $step;
9431

95-
if ($this->metrics['accept'] === $this->messageLimit) {
96-
$this->sendUpdate();
97-
98-
$this->metrics['accept'] = 0;
99-
$this->metrics['reject'] = 0;
100-
$this->metrics['error'] = 0;
101-
}
32+
$this->manager->trySend($step);
10233
}
10334

10435
public function reject(int $step = 1): void
10536
{
106-
$this->metrics['reject'] += $step;
37+
$this->rejectMetric += $step;
38+
39+
$this->manager->trySend($step);
10740
}
10841

10942
public function error(int $step = 1): void
11043
{
111-
$this->metrics['error'] += $step;
44+
$this->errorMetric += $step;
45+
46+
$this->manager->trySend($step);
11247
}
11348

11449
public function teardown(): void
11550
{
116-
$this->sendUpdate();
117-
118-
$this->channel->close();
119-
$this->connection->stop();
51+
$this->manager->teardown($this);
12052
}
12153

122-
private function sendUpdate(): void
54+
public function toArray(): array
12355
{
124-
$date = new \DateTime();
56+
return [
57+
'code' => $this->stepCode,
58+
'label' => $this->stepLabel ?: $this->stepCode,
59+
'metrics' => iterator_to_array($this->walkMetrics()),
60+
];
61+
}
12562

126-
$this->channel->publish(
127-
\json_encode([
128-
'id' => $this->pipelineId,
129-
'date' => ['date' => $date->format('c'), 'tz' => $date->getTimezone()->getName()],
130-
'stepsUpdates' => [
131-
[
132-
'code' => $this->stepCode,
133-
'label' => $this->stepLabel ?: $this->stepCode,
134-
'metrics' => [
135-
[
136-
'code' => 'accept',
137-
'value' => $this->metrics['accept']
138-
],
139-
[
140-
'code' => 'reject',
141-
'value' => $this->metrics['reject']
142-
],
143-
[
144-
'code' => 'error',
145-
'value' => $this->metrics['error']
146-
]
147-
]
148-
]
149-
]
150-
]),
151-
[
152-
'content-type' => 'application/json',
153-
],
154-
$this->exchange,
155-
$this->topic
156-
);
63+
private function walkMetrics(): \Generator
64+
{
65+
if ($this->acceptMetric > 0) {
66+
yield [
67+
'code' => 'accept',
68+
'value' => $this->acceptMetric,
69+
];
70+
$this->acceptMetric = 0;
71+
}
72+
if ($this->rejectMetric > 0) {
73+
yield [
74+
'code' => 'reject',
75+
'value' => $this->rejectMetric,
76+
];
77+
$this->rejectMetric = 0;
78+
}
79+
if ($this->errorMetric > 0) {
80+
yield [
81+
'code' => 'error',
82+
'value' => $this->errorMetric,
83+
];
84+
$this->errorMetric = 0;
85+
}
15786
}
15887
}

src/StateManager.php

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
namespace Kiboko\Component\Flow\RabbitMQ;
4+
5+
use Bunny\Channel;
6+
use Bunny\Client;
7+
8+
class StateManager
9+
{
10+
private array $steps = [];
11+
private array $tearedDown = [];
12+
private int $messageCount = 0;
13+
private int $lineCount = 0;
14+
private Channel $channel;
15+
16+
public function __construct(
17+
private Client $connection,
18+
private string $pipelineId,
19+
private string $topic,
20+
private int $lineThreshold = 1000,
21+
private ?string $exchange = null,
22+
) {
23+
$this->channel = $this->connection->channel();
24+
25+
$this->channel->queueDeclare(
26+
queue: $this->topic,
27+
passive: false,
28+
durable: true,
29+
exclusive: false,
30+
autoDelete: true,
31+
);
32+
}
33+
34+
public function __destruct()
35+
{
36+
$this->channel->close();
37+
}
38+
39+
public function stepState(
40+
string $stepCode,
41+
string $stepLabel,
42+
): State {
43+
return $this->steps[] = new State($this, $stepCode, $stepLabel);
44+
}
45+
46+
public function trySend($count): void
47+
{
48+
$this->lineCount += $count;
49+
50+
if ($this->lineCount >= $this->lineThreshold) {
51+
$this->sendUpdate();
52+
$this->lineCount = 0;
53+
}
54+
}
55+
56+
public function teardown(State $step): void
57+
{
58+
$this->tearedDown[] = $step;
59+
60+
if (count($this->steps) <= count($this->tearedDown)) {
61+
$this->sendUpdate();
62+
$this->lineCount = 0;
63+
64+
$this->steps = [];
65+
$this->tearedDown = [];
66+
}
67+
}
68+
69+
private function sendUpdate(): void
70+
{
71+
$date = new \DateTimeImmutable();
72+
73+
$this->channel->publish(
74+
\json_encode([
75+
'messageNumber' => ++$this->messageCount,
76+
'id' => $this->pipelineId,
77+
'date' => ['date' => $date->format('c'), 'tz' => $date->getTimezone()->getName()],
78+
'stepsUpdates' => array_map(fn (State $step) => $step->toArray(), $this->steps),
79+
]),
80+
[
81+
'content-type' => 'application/json',
82+
],
83+
$this->exchange,
84+
$this->topic
85+
);
86+
}
87+
}

0 commit comments

Comments
 (0)