Skip to content

Commit 80dd3ed

Browse files
committed
Updated code to be compatible with php-etl/pipeline-contracts:0.5
1 parent 8932e8c commit 80dd3ed

File tree

3 files changed

+36
-29
lines changed

3 files changed

+36
-29
lines changed

src/Rejection.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Bunny\Channel;
88
use Bunny\Client;
99
use Kiboko\Contract\Pipeline\RejectionInterface;
10+
use Kiboko\Contract\Pipeline\StepCodeInterface;
1011

1112
final readonly class Rejection implements RejectionInterface
1213
{
@@ -70,7 +71,23 @@ public static function withAuthentication(
7071
return new self($connection, stepUuid: $stepUuid, topic: $topic, exchange: $exchange);
7172
}
7273

73-
public function reject(object|array $rejection, ?\Throwable $exception = null): void
74+
public function reject(StepCodeInterface $step, object|array $rejection, ?\Throwable $exception = null): void
75+
{
76+
$this->channel->publish(
77+
json_encode([
78+
'item' => $rejection,
79+
'exception' => $exception,
80+
'step' => $this->stepUuid,
81+
], \JSON_THROW_ON_ERROR),
82+
[
83+
'content-type' => 'application/json',
84+
],
85+
$this->exchange,
86+
$this->topic,
87+
);
88+
}
89+
90+
public function rejectWithReason(StepCodeInterface $step, object|array $rejection, string $reason, ?\Throwable $exception = null): void
7491
{
7592
$this->channel->publish(
7693
json_encode([

src/State.php

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
namespace Kiboko\Component\Flow\RabbitMQ;
66

7-
use Kiboko\Contract\Pipeline\StateInterface;
7+
use Kiboko\Contract\Pipeline\StepCodeInterface;
8+
use Kiboko\Contract\Pipeline\StepStateInterface;
89

9-
final class State implements StateInterface
10+
final class State implements StepStateInterface
1011
{
12+
private array $steps = [];
1113
private int $acceptMetric = 0;
1214
private int $rejectMetric = 0;
1315
private int $errorMetric = 0;
@@ -19,37 +21,25 @@ public function __construct(
1921
) {
2022
}
2123

22-
public function initialize(int $start = 0): void
24+
public function accept(int $count = 1): void
2325
{
24-
$this->acceptMetric = 0;
25-
$this->rejectMetric = 0;
26-
$this->errorMetric = 0;
27-
}
28-
29-
public function accept(int $step = 1): void
30-
{
31-
$this->acceptMetric += $step;
26+
$this->acceptMetric += $count;
3227

33-
$this->manager->trySend($step);
28+
$this->manager->trySend($this->stepCode);
3429
}
3530

36-
public function reject(int $step = 1): void
31+
public function reject(int $count = 1): void
3732
{
38-
$this->rejectMetric += $step;
33+
$this->rejectMetric += $count;
3934

40-
$this->manager->trySend($step);
35+
$this->manager->trySend($this->stepCode);
4136
}
4237

43-
public function error(int $step = 1): void
38+
public function error(int $count = 1): void
4439
{
45-
$this->errorMetric += $step;
40+
$this->errorMetric += $count;
4641

47-
$this->manager->trySend($step);
48-
}
49-
50-
public function teardown(): void
51-
{
52-
$this->manager->teardown($this);
42+
$this->manager->trySend($this->stepCode);
5343
}
5444

5545
public function toArray(): array

src/StateManager.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
class StateManager
1111
{
12+
/** @var list<State> */
13+
private array $states = [];
1214
private array $steps = [];
1315
private array $tearedDown = [];
1416
private int $messageCount = 0;
@@ -37,11 +39,9 @@ public function __destruct()
3739
$this->channel->close();
3840
}
3941

40-
public function stepState(
41-
string $stepCode,
42-
string $stepLabel,
43-
): State {
44-
return $this->steps[] = new State($this, $stepCode, $stepLabel);
42+
public function stepState(string $stepCode, string $stepLabel): State
43+
{
44+
return $this->steps[$stepCode] = new State($this, $stepCode, $stepLabel);
4545
}
4646

4747
public function trySend($count): void

0 commit comments

Comments
 (0)