Skip to content

Commit 3ea13f5

Browse files
committed
Fixed the pipeline flush feature and added tests
1 parent 6f550d9 commit 3ea13f5

File tree

2 files changed

+155
-15
lines changed

2 files changed

+155
-15
lines changed

src/Pipeline.php

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public function feed(...$data): void
3636

3737
public function fork(callable ...$builders): ForkingInterface
3838
{
39+
error_log('The Pipeline::fork() method is not fully functional and has been deprecated for now. No replacement exists.', E_USER_DEPRECATED);
3940
$runner = $this->runner;
4041
$handlers = [];
4142
foreach ($builders as $builder) {
@@ -87,18 +88,14 @@ public function extract(ExtractorInterface $extractor): ExtractingInterface
8788
$extract = $extractor->extract();
8889
if (is_array($extract)) {
8990
$this->source->append(new \ArrayIterator($extract));
91+
} elseif ($extract instanceof \Iterator) {
92+
$this->source->append($extract);
9093
} elseif ($extract instanceof \Traversable) {
9194
$this->source->append(new \IteratorIterator($extract));
9295
} else {
9396
throw new \RuntimeException('Invalid data source, expecting array or Traversable.');
9497
}
9598

96-
if ($extractor instanceof FlushableInterface) {
97-
$this->source->append((function (FlushableInterface $flushable) {
98-
yield from $flushable->flush();
99-
})($extractor));
100-
}
101-
10299
return $this;
103100
}
104101

@@ -113,12 +110,17 @@ public function transform(TransformerInterface $transformer): TransformingInterf
113110
$iterator = new \AppendIterator();
114111

115112
$iterator->append(
116-
$main = $this->runner->run($this->subject, $transformer->transform())
113+
$this->runner->run($this->subject, $transformer->transform())
114+
);
115+
$iterator->append(
116+
$this->runner->run(
117+
new \ArrayIterator([null]),
118+
(function () use ($transformer): \Generator {
119+
yield;
120+
yield $transformer->flush();
121+
})()
122+
)
117123
);
118-
119-
$iterator->append((function (FlushableInterface $flushable) {
120-
yield from $flushable->flush();
121-
})($transformer));
122124
} else {
123125
$iterator = $this->runner->run($this->subject, $transformer->transform());
124126
}
@@ -139,12 +141,18 @@ public function load(LoaderInterface $loader): LoadingInterface
139141
$iterator = new \AppendIterator();
140142

141143
$iterator->append(
142-
$main = $this->runner->run($this->subject, $loader->load())
144+
$this->runner->run($this->subject, $loader->load())
143145
);
144146

145-
$iterator->append((function (FlushableInterface $flushable) {
146-
yield from $flushable->flush();
147-
})($loader));
147+
$iterator->append(
148+
$this->runner->run(
149+
new \ArrayIterator([null]),
150+
(function () use ($loader): \Generator {
151+
yield;
152+
yield $loader->flush();
153+
})()
154+
)
155+
);
148156
} else {
149157
$iterator = $this->runner->run($this->subject, $loader->load());
150158
}

unit/PipelineTest.php

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
<?php declare(strict_types=1);
2+
3+
namespace unit\Kiboko\Component\ETL\Pipeline;
4+
5+
use Kiboko\Component\Bucket\AcceptanceResultBucket;
6+
use Kiboko\Component\Pipeline\Pipeline;
7+
use Kiboko\Component\Pipeline\PipelineRunner;
8+
use Kiboko\Contract\Bucket\ResultBucketInterface;
9+
use Kiboko\Contract\Pipeline\ExtractorInterface;
10+
use Kiboko\Contract\Pipeline\FlushableInterface;
11+
use Kiboko\Contract\Pipeline\LoaderInterface;
12+
use Kiboko\Contract\Pipeline\TransformerInterface;
13+
14+
final class PipelineTest extends IterableTestCase
15+
{
16+
public function testExtractorWithoutFlush()
17+
{
18+
$pipeline = new Pipeline(new PipelineRunner(null));
19+
20+
$pipeline->extract(new class implements ExtractorInterface {
21+
public function extract(): iterable
22+
{
23+
yield 'lorem';
24+
yield 'ipsum';
25+
yield 'dolor';
26+
}
27+
});
28+
29+
$this->assertIteration(
30+
new \ArrayIterator(['lorem', 'ipsum', 'dolor']),
31+
$pipeline->walk()
32+
);
33+
}
34+
35+
public function testTransformerWithoutFlush()
36+
{
37+
$pipeline = new Pipeline(new PipelineRunner(null));
38+
39+
$pipeline->feed('lorem', 'ipsum', 'dolor');
40+
41+
$pipeline->transform(new class implements TransformerInterface {
42+
public function transform(): \Generator
43+
{
44+
$line = yield;
45+
$line = yield new AcceptanceResultBucket(str_rot13($line));
46+
$line = yield new AcceptanceResultBucket(str_rot13($line));
47+
yield new AcceptanceResultBucket(str_rot13($line));
48+
}
49+
});
50+
51+
$this->assertIteration(
52+
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
53+
$pipeline->walk()
54+
);
55+
}
56+
57+
public function testTransformerWithFlush()
58+
{
59+
$pipeline = new Pipeline(new PipelineRunner(null));
60+
61+
$pipeline->feed('lorem', 'ipsum', 'dolor');
62+
63+
$pipeline->transform(new class implements TransformerInterface, FlushableInterface {
64+
public function transform(): \Generator
65+
{
66+
$line = yield;
67+
$line = yield new AcceptanceResultBucket(str_rot13($line));
68+
$line = yield new AcceptanceResultBucket(str_rot13($line));
69+
yield new AcceptanceResultBucket(str_rot13($line));
70+
}
71+
72+
public function flush(): ResultBucketInterface
73+
{
74+
return new AcceptanceResultBucket(str_rot13('sit amet'));
75+
}
76+
});
77+
78+
$this->assertIteration(
79+
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
80+
$pipeline->walk()
81+
);
82+
}
83+
84+
public function testLoaderWithoutFlush()
85+
{
86+
$pipeline = new Pipeline(new PipelineRunner(null));
87+
88+
$pipeline->feed('lorem', 'ipsum', 'dolor');
89+
90+
$pipeline->load(new class implements LoaderInterface {
91+
public function load(): \Generator
92+
{
93+
$line = yield;
94+
$line = yield new AcceptanceResultBucket(str_rot13($line));
95+
$line = yield new AcceptanceResultBucket(str_rot13($line));
96+
yield new AcceptanceResultBucket(str_rot13($line));
97+
}
98+
});
99+
100+
$this->assertIteration(
101+
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe']),
102+
$pipeline->walk()
103+
);
104+
}
105+
106+
public function testLoaderWithFlush()
107+
{
108+
$pipeline = new Pipeline(new PipelineRunner(null));
109+
110+
$pipeline->feed('lorem', 'ipsum', 'dolor');
111+
112+
$pipeline->load(new class implements LoaderInterface, FlushableInterface {
113+
public function load(): \Generator
114+
{
115+
$line = yield;
116+
$line = yield new AcceptanceResultBucket(str_rot13($line));
117+
$line = yield new AcceptanceResultBucket(str_rot13($line));
118+
yield new AcceptanceResultBucket(str_rot13($line));
119+
}
120+
121+
public function flush(): ResultBucketInterface
122+
{
123+
return new AcceptanceResultBucket(str_rot13('sit amet'));
124+
}
125+
});
126+
127+
$this->assertIteration(
128+
new \ArrayIterator(['yberz', 'vcfhz', 'qbybe', 'fvg nzrg']),
129+
$pipeline->walk()
130+
);
131+
}
132+
}

0 commit comments

Comments
 (0)