Skip to content

Commit 71f7e25

Browse files
committed
Implemented rejections and states
1 parent 3ea13f5 commit 71f7e25

File tree

6 files changed

+112
-108
lines changed

6 files changed

+112
-108
lines changed

composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"php": "^8.0",
1919
"psr/log": "^1.1@dev",
2020
"php-etl/bucket": "^0.1.0",
21-
"php-etl/pipeline-contracts": "^0.1.0",
21+
"php-etl/pipeline-contracts": "^0.2.0",
2222
"php-etl/bucket-contracts": "^0.1.0"
2323
},
2424
"require-dev": {
@@ -45,7 +45,7 @@
4545
},
4646
"extra": {
4747
"branch-alias": {
48-
"dev-master": "0.2.x-dev"
48+
"dev-master": "0.3.x-dev"
4949
}
5050
}
5151
}

composer.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Pipeline.php

Lines changed: 74 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,21 @@
22

33
namespace Kiboko\Component\Pipeline;
44

5-
use Kiboko\Component\Bucket\AcceptanceAppendableResultBucket;
65
use Kiboko\Contract\Pipeline\ExtractingInterface;
76
use Kiboko\Contract\Pipeline\ExtractorInterface;
87
use Kiboko\Contract\Pipeline\FlushableInterface;
9-
use Kiboko\Contract\Pipeline\ForkingInterface;
108
use Kiboko\Contract\Pipeline\LoaderInterface;
119
use Kiboko\Contract\Pipeline\LoadingInterface;
1210
use Kiboko\Contract\Pipeline\PipelineInterface;
1311
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
12+
use Kiboko\Contract\Pipeline\RejectionInterface;
1413
use Kiboko\Contract\Pipeline\RunnableInterface;
14+
use Kiboko\Contract\Pipeline\StateInterface;
1515
use Kiboko\Contract\Pipeline\TransformerInterface;
1616
use Kiboko\Contract\Pipeline\TransformingInterface;
1717
use Kiboko\Contract\Pipeline\WalkableInterface;
1818

19-
class Pipeline implements PipelineInterface, ForkingInterface, WalkableInterface, RunnableInterface
19+
class Pipeline implements PipelineInterface, WalkableInterface, RunnableInterface
2020
{
2121
private \AppendIterator $source;
2222
private iterable $subject;
@@ -34,114 +34,107 @@ public function feed(...$data): void
3434
$this->source->append(new \ArrayIterator($data));
3535
}
3636

37-
public function fork(callable ...$builders): ForkingInterface
37+
private function passThroughCoroutine(): \Generator
3838
{
39-
error_log('The Pipeline::fork() method is not fully functional and has been deprecated for now. No replacement exists.', E_USER_DEPRECATED);
40-
$runner = $this->runner;
41-
$handlers = [];
42-
foreach ($builders as $builder) {
43-
$handlers[] = $handler = new class(new Pipeline($runner)) implements \IteratorAggregate {
44-
/** @var \Iterator */
45-
public $consumer;
46-
47-
public function __construct(public PipelineInterface $pipeline)
48-
{
49-
$this->consumer = $pipeline->walk();
50-
$this->consumer->rewind();
51-
}
52-
53-
public function getIterator()
54-
{
55-
return $this->consumer;
56-
}
57-
};
58-
59-
$builder($handler->pipeline);
60-
}
61-
62-
$this->subject = $this->runner->run($this->subject, (function (array $handlers) {
63-
$line = yield;
64-
65-
while (true) {
66-
$bucket = new AcceptanceAppendableResultBucket();
67-
68-
/** @var \Iterator $handler */
69-
foreach ($handlers as $handler) {
70-
$handler->pipeline->feed($line);
71-
$bucket->append(new \NoRewindIterator($handler));
72-
}
73-
74-
$line = yield $bucket;
75-
}
76-
})($handlers));
77-
78-
return $this;
39+
$line = yield;
40+
while ($line = yield $line);
7941
}
8042

81-
/**
82-
* @param ExtractorInterface $extractor
83-
*
84-
* @return $this
85-
*/
86-
public function extract(ExtractorInterface $extractor): ExtractingInterface
87-
{
43+
public function extract(
44+
ExtractorInterface $extractor,
45+
RejectionInterface $rejection,
46+
StateInterface $state,
47+
): ExtractingInterface {
8848
$extract = $extractor->extract();
8949
if (is_array($extract)) {
90-
$this->source->append(new \ArrayIterator($extract));
50+
$this->source->append(
51+
$this->runner->run(
52+
new \ArrayIterator($extract),
53+
$this->passThroughCoroutine(),
54+
$rejection,
55+
$state
56+
)
57+
);
9158
} elseif ($extract instanceof \Iterator) {
92-
$this->source->append($extract);
59+
$this->source->append(
60+
$this->runner->run(
61+
$extract,
62+
$this->passThroughCoroutine(),
63+
$rejection,
64+
$state
65+
)
66+
);
9367
} elseif ($extract instanceof \Traversable) {
94-
$this->source->append(new \IteratorIterator($extract));
68+
$this->source->append(
69+
$this->runner->run(
70+
new \IteratorIterator($extract),
71+
$this->passThroughCoroutine(),
72+
$rejection,
73+
$state
74+
)
75+
);
9576
} else {
9677
throw new \RuntimeException('Invalid data source, expecting array or Traversable.');
9778
}
9879

9980
return $this;
10081
}
10182

102-
/**
103-
* @param TransformerInterface $transformer
104-
*
105-
* @return $this
106-
*/
107-
public function transform(TransformerInterface $transformer): TransformingInterface
108-
{
83+
public function transform(
84+
TransformerInterface $transformer,
85+
RejectionInterface $rejection,
86+
StateInterface $state,
87+
): TransformingInterface {
10988
if ($transformer instanceof FlushableInterface) {
11089
$iterator = new \AppendIterator();
11190

11291
$iterator->append(
113-
$this->runner->run($this->subject, $transformer->transform())
92+
$this->runner->run(
93+
$this->subject,
94+
$transformer->transform(),
95+
$rejection,
96+
$state,
97+
)
11498
);
11599
$iterator->append(
116100
$this->runner->run(
117101
new \ArrayIterator([null]),
118102
(function () use ($transformer): \Generator {
119103
yield;
120104
yield $transformer->flush();
121-
})()
105+
})(),
106+
$rejection,
107+
$state,
122108
)
123109
);
124110
} else {
125-
$iterator = $this->runner->run($this->subject, $transformer->transform());
111+
$iterator = $this->runner->run(
112+
$this->subject,
113+
$transformer->transform(),
114+
$rejection,
115+
$state,
116+
);
126117
}
127118

128119
$this->subject = new \NoRewindIterator($iterator);
129120

130121
return $this;
131122
}
132123

133-
/**
134-
* @param LoaderInterface $loader
135-
*
136-
* @return $this
137-
*/
138-
public function load(LoaderInterface $loader): LoadingInterface
139-
{
124+
public function load(
125+
LoaderInterface $loader,
126+
RejectionInterface $rejection,
127+
StateInterface $state,
128+
): LoadingInterface {
140129
if ($loader instanceof FlushableInterface) {
141130
$iterator = new \AppendIterator();
142131

143132
$iterator->append(
144-
$this->runner->run($this->subject, $loader->load())
133+
$this->runner->run(
134+
$this->subject, $loader->load(),
135+
$rejection,
136+
$state,
137+
)
145138
);
146139

147140
$iterator->append(
@@ -150,29 +143,30 @@ public function load(LoaderInterface $loader): LoadingInterface
150143
(function () use ($loader): \Generator {
151144
yield;
152145
yield $loader->flush();
153-
})()
146+
})(),
147+
$rejection,
148+
$state,
154149
)
155150
);
156151
} else {
157-
$iterator = $this->runner->run($this->subject, $loader->load());
152+
$iterator = $this->runner->run(
153+
$this->subject,
154+
$loader->load(),
155+
$rejection,
156+
$state,
157+
);
158158
}
159159

160160
$this->subject = new \NoRewindIterator($iterator);
161161

162162
return $this;
163163
}
164164

165-
/**
166-
* @return \Iterator
167-
*/
168165
public function walk(): \Iterator
169166
{
170167
yield from $this->subject;
171168
}
172169

173-
/**
174-
* @return int
175-
*/
176170
public function run(): int
177171
{
178172
return iterator_count($this->walk());

src/PipelineRunner.php

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
use Kiboko\Contract\Bucket\RejectionResultBucketInterface;
77
use Kiboko\Contract\Bucket\ResultBucketInterface;
88
use Kiboko\Contract\Pipeline\PipelineRunnerInterface;
9+
use Kiboko\Contract\Pipeline\RejectionInterface;
10+
use Kiboko\Contract\Pipeline\StateInterface;
911
use Psr\Log\LoggerInterface;
1012
use Psr\Log\LogLevel;
1113
use Psr\Log\NullLogger;
@@ -19,14 +21,14 @@ public function __construct(?LoggerInterface $logger, private string $rejectionL
1921
$this->logger = $logger ?? new NullLogger();
2022
}
2123

22-
/**
23-
* @param \Iterator $source
24-
* @param \Generator $coroutine
25-
*
26-
* @return \Iterator
27-
*/
28-
public function run(\Iterator $source, \Generator $coroutine): \Iterator
29-
{
24+
public function run(
25+
\Iterator $source,
26+
\Generator $coroutine,
27+
RejectionInterface $rejection,
28+
StateInterface $state,
29+
): \Iterator {
30+
$state->initialize();
31+
3032
$wrapper = new GeneratorWrapper();
3133
$wrapper->rewind($source, $coroutine);
3234

@@ -46,12 +48,15 @@ public function run(\Iterator $source, \Generator $coroutine): \Iterator
4648
}
4749

4850
if ($bucket instanceof RejectionResultBucketInterface) {
49-
foreach ($bucket as $rejection) {
51+
foreach ($bucket->walkRejection() as $line) {
52+
$rejection->reject($line);
53+
$state->reject();
54+
5055
$this->logger->log(
5156
$this->rejectionLevel,
5257
'Some data was rejected from the pipeline',
5358
[
54-
'line' => $rejection
59+
'line' => $line
5560
]
5661
);
5762
}
@@ -71,6 +76,7 @@ public function run(\Iterator $source, \Generator $coroutine): \Iterator
7176

7277
if ($bucket instanceof AcceptanceResultBucketInterface) {
7378
yield from $bucket->walkAcceptance();
79+
$state->accept();
7480
}
7581

7682
$wrapper->next($source);

unit/PipelineRunnerTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
use Kiboko\Component\Bucket\AcceptanceResultBucket;
66
use Kiboko\Component\Bucket\EmptyResultBucket;
77
use Kiboko\Component\Pipeline\PipelineRunner;
8+
use Kiboko\Contract\Pipeline\NullRejection;
9+
use Kiboko\Contract\Pipeline\NullState;
810
use Psr\Log\NullLogger;
911

1012
class PipelineRunnerTest extends IterableTestCase
@@ -112,7 +114,7 @@ public function testRun(\Iterator $source, callable $callback, array $expected)
112114
{
113115
$run = new PipelineRunner(new NullLogger());
114116

115-
$it = $run->run($source, $callback());
117+
$it = $run->run($source, $callback(), new NullRejection(), new NullState());
116118

117119
$this->assertIteration(new \ArrayIterator($expected), $it);
118120
}

0 commit comments

Comments
 (0)