Skip to content

Commit b9616ba

Browse files
committed
add additional pipe types;
dbal provider; rename handler to pipe;
1 parent 565c95b commit b9616ba

17 files changed

+445
-51
lines changed

README.md

-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
11
# DataFlow
22
Build a pipeline to flow your data through!
33

4-
## Roadmap
5-
1. ~~Add CSV provider.~~
6-
2. ~~Add CSV writer.~~
7-
3. Implement Emitter?
8-
4. Concurrency?

src/LazyPlumber.php

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace SlayerBirden\DataFlow;
5+
6+
/**
7+
* This plumber is laze. It delegates pouring to others using emitter;
8+
* Can be used for concurrency.
9+
*/
10+
class LazyPlumber
11+
{
12+
/**
13+
* @var ProviderInterface
14+
*/
15+
private $provider;
16+
/**
17+
* @var PipeLineInterface
18+
*/
19+
private $pipeLine;
20+
/**
21+
* @var EmitterInterface
22+
*/
23+
private $emitter;
24+
25+
public function __construct(
26+
ProviderInterface $provider,
27+
PipeLineInterface $pipeLine,
28+
EmitterInterface $emitter
29+
) {
30+
$this->provider = $provider;
31+
$this->pipeLine = $pipeLine;
32+
$this->emitter = $emitter;
33+
}
34+
35+
public function pour(): void
36+
{
37+
$provider = $this->provider->getCask();
38+
foreach ($provider as $dataBag) {
39+
$this->emitter->emit('pour', $dataBag, $this->pipeLine);
40+
}
41+
$this->emitter->emit('empty_cask');
42+
}
43+
}

src/Pipe/Copy.php

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace SlayerBirden\DataFlow\Pipe;
5+
6+
use SlayerBirden\DataFlow\DataBagInterface;
7+
use SlayerBirden\DataFlow\IdentificationTrait;
8+
use SlayerBirden\DataFlow\PipeInterface;
9+
10+
class Copy implements PipeInterface
11+
{
12+
use IdentificationTrait;
13+
/**
14+
* @var string
15+
*/
16+
private $id;
17+
/**
18+
* @var string
19+
*/
20+
private $from;
21+
/**
22+
* @var string
23+
*/
24+
private $to;
25+
26+
public function __construct(string $id, string $from, string $to)
27+
{
28+
$this->id = $id;
29+
$this->from = $from;
30+
$this->to = $to;
31+
}
32+
33+
public function pass(DataBagInterface $dataBag): DataBagInterface
34+
{
35+
$from = $dataBag[$this->from] ?? null;
36+
$dataBag[$this->to] = $from;
37+
38+
return $dataBag;
39+
}
40+
}

src/Pipe/Delete.php

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace SlayerBirden\DataFlow\Pipe;
5+
6+
use SlayerBirden\DataFlow\DataBagInterface;
7+
use SlayerBirden\DataFlow\IdentificationTrait;
8+
use SlayerBirden\DataFlow\PipeInterface;
9+
10+
class Delete implements PipeInterface
11+
{
12+
use IdentificationTrait;
13+
/**
14+
* @var string
15+
*/
16+
private $id;
17+
/**
18+
* @var string[]
19+
*/
20+
private $fieldNames;
21+
22+
public function __construct(string $id, string ...$fieldNames)
23+
{
24+
$this->id = $id;
25+
$this->fieldNames = $fieldNames;
26+
}
27+
28+
public function pass(DataBagInterface $dataBag): DataBagInterface
29+
{
30+
foreach ($this->fieldNames as $name) {
31+
if (isset($dataBag[$name])) {
32+
unset($dataBag[$name]);
33+
}
34+
}
35+
36+
return $dataBag;
37+
}
38+
}

src/Handler/Filter.php src/Pipe/Filter.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php
22
declare(strict_types=1);
33

4-
namespace SlayerBirden\DataFlow\Handler;
4+
namespace SlayerBirden\DataFlow\Pipe;
55

66
use SlayerBirden\DataFlow\DataBagInterface;
77
use SlayerBirden\DataFlow\Exception\FlowTerminationException;

src/Handler/FilterCallbackInterface.php src/Pipe/FilterCallbackInterface.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php
22
declare(strict_types=1);
33

4-
namespace SlayerBirden\DataFlow\Handler;
4+
namespace SlayerBirden\DataFlow\Pipe;
55

66
use SlayerBirden\DataFlow\DataBagInterface;
77
use SlayerBirden\DataFlow\Exception\FlowTerminationException;

src/Handler/Mapper.php src/Pipe/Map.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
<?php
22
declare(strict_types=1);
33

4-
namespace SlayerBirden\DataFlow\Handler;
4+
namespace SlayerBirden\DataFlow\Pipe;
55

66
use SlayerBirden\DataFlow\DataBagInterface;
77
use SlayerBirden\DataFlow\PipeInterface;
88
use SlayerBirden\DataFlow\IdentificationTrait;
99

10-
class Mapper implements PipeInterface
10+
class Map implements PipeInterface
1111
{
1212
use IdentificationTrait;
1313
/**

src/Handler/MapperCallbackInterface.php src/Pipe/MapperCallbackInterface.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?php
22
declare(strict_types=1);
33

4-
namespace SlayerBirden\DataFlow\Handler;
4+
namespace SlayerBirden\DataFlow\Pipe;
55

66
use SlayerBirden\DataFlow\DataBagInterface;
77

src/Pipe/Swap.php

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
declare(strict_types=1);
3+
4+
namespace SlayerBirden\DataFlow\Pipe;
5+
6+
use SlayerBirden\DataFlow\DataBagInterface;
7+
use SlayerBirden\DataFlow\IdentificationTrait;
8+
use SlayerBirden\DataFlow\PipeInterface;
9+
10+
class Swap implements PipeInterface
11+
{
12+
use IdentificationTrait;
13+
/**
14+
* @var string
15+
*/
16+
private $id;
17+
/**
18+
* @var string
19+
*/
20+
private $first;
21+
/**
22+
* @var string
23+
*/
24+
private $second;
25+
26+
public function __construct(string $id, string $first, string $second)
27+
{
28+
$this->id = $id;
29+
$this->first = $first;
30+
$this->second = $second;
31+
}
32+
33+
public function pass(DataBagInterface $dataBag): DataBagInterface
34+
{
35+
$first = $dataBag[$this->first] ?? null;
36+
$second = $dataBag[$this->second] ?? null;
37+
$dataBag[$this->first] = $second;
38+
$dataBag[$this->second] = $first;
39+
40+
return $dataBag;
41+
}
42+
}

src/PipelineBuilder.php

+38-11
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55

66
use Doctrine\DBAL\Connection;
77
use Doctrine\DBAL\Schema\Index;
8-
use SlayerBirden\DataFlow\Handler\Filter;
9-
use SlayerBirden\DataFlow\Handler\FilterCallbackInterface;
10-
use SlayerBirden\DataFlow\Handler\Mapper;
11-
use SlayerBirden\DataFlow\Handler\MapperCallbackInterface;
8+
use SlayerBirden\DataFlow\Pipe\Copy;
9+
use SlayerBirden\DataFlow\Pipe\Delete;
10+
use SlayerBirden\DataFlow\Pipe\Filter;
11+
use SlayerBirden\DataFlow\Pipe\FilterCallbackInterface;
12+
use SlayerBirden\DataFlow\Pipe\Map;
13+
use SlayerBirden\DataFlow\Pipe\MapperCallbackInterface;
14+
use SlayerBirden\DataFlow\Pipe\Swap;
1215
use SlayerBirden\DataFlow\Writer\ArrayWrite;
1316
use SlayerBirden\DataFlow\Writer\Dbal\AutoIncrementCallbackInterface;
1417
use SlayerBirden\DataFlow\Writer\Dbal\Write;
@@ -24,7 +27,7 @@ class PipelineBuilder implements PipelineBuilderInterface
2427
/**
2528
* @var WriterUtilityInterface
2629
*/
27-
private static $utility;
30+
private $utility;
2831

2932
private $pipesCount = 0;
3033
/**
@@ -56,17 +59,41 @@ public function map(string $field, MapperCallbackInterface $callback, ?string $i
5659
if (!$id) {
5760
$id = 'mapper' . $this->pipesCount++ . '-' . $field;
5861
}
59-
return $this->addSection(new Mapper($id, $field, $callback));
62+
return $this->addSection(new Map($id, $field, $callback));
6063
}
6164

6265
public function filter(FilterCallbackInterface $callback, ?string $id = null): PipelineBuilder
6366
{
6467
if (!$id) {
65-
$id = 'filter' . $this->pipesCount++ . '-' . md5(get_class($callback));
68+
$id = 'filter' . $this->pipesCount++ . '-' . get_class($callback);
6669
}
6770
return $this->addSection(new Filter($id, $callback));
6871
}
6972

73+
public function delete(array $names, ?string $id = null): PipelineBuilder
74+
{
75+
if (!$id) {
76+
$id = 'delete' . $this->pipesCount++ . '-' . json_encode($names);
77+
}
78+
return $this->addSection(new Delete($id, ...$names));
79+
}
80+
81+
public function swap(string $first, string $second, ?string $id = null): PipelineBuilder
82+
{
83+
if (!$id) {
84+
$id = 'swap' . $this->pipesCount++ . '-' . $first . '-' . $second;
85+
}
86+
return $this->addSection(new Swap($id, $first, $second));
87+
}
88+
89+
public function cp(string $from, string $to, ?string $id = null): PipelineBuilder
90+
{
91+
if (!$id) {
92+
$id = 'copy' . $this->pipesCount++ . '-' . $from . '-' . $to;
93+
}
94+
return $this->addSection(new Copy($id, $from, $to));
95+
}
96+
7097
public function dbalWrite(
7198
string $table,
7299
Connection $connection,
@@ -102,10 +129,10 @@ public function getPipeline(): PipeLineInterface
102129
return $this->pipeline;
103130
}
104131

105-
public static function getDbalUtility(Connection $connection): WriterUtilityInterface
132+
public function getDbalUtility(Connection $connection): WriterUtilityInterface
106133
{
107-
if (self::$utility === null) {
108-
self::$utility = new class($connection) implements WriterUtilityInterface
134+
if ($this->utility === null) {
135+
$this->utility = new class($connection) implements WriterUtilityInterface
109136
{
110137
/**
111138
* @var Connection
@@ -139,6 +166,6 @@ public function getUniqueKeys(string $table): array
139166
}
140167
};
141168
}
142-
return self::$utility;
169+
return $this->utility;
143170
}
144171
}

src/Provider/Csv.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
namespace SlayerBirden\DataFlow\Provider;
55

6+
use SlayerBirden\DataFlow\Data\SimpleBag;
67
use SlayerBirden\DataFlow\IdentificationTrait;
78
use SlayerBirden\DataFlow\Provider\Exception\FileDoesNotExist;
89
use SlayerBirden\DataFlow\Provider\Exception\HeaderInvalid;
@@ -82,7 +83,7 @@ public function getCask(): \Generator
8283
)
8384
);
8485
}
85-
yield array_combine($this->header, $row);
86+
yield new SimpleBag(array_combine($this->header, $row));
8687
$this->file->next();
8788
}
8889
}

0 commit comments

Comments
 (0)