Skip to content

Commit c2b7844

Browse files
committed
Add Redis pipeline support
1 parent 8d5d3b2 commit c2b7844

File tree

9 files changed

+378
-53
lines changed

9 files changed

+378
-53
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010

1111
- Shorthand for Reducer functions
1212
- Add **Deprecated** section in `CHANGELOG.md`
13+
- Add Redis pipeline support
1314

1415
### Changed
1516

README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,53 @@ $query = $queryBuilder
8484
// Doe ~John @age:[(17 +inf] -@address:[40.589247 -74.044502 40.000000 km]
8585
```
8686

87+
### Pipeline
88+
89+
```php
90+
use MacFJA\RediSearch\Aggregate;
91+
use MacFJA\RediSearch\Aggregate\Reducer;
92+
use MacFJA\RediSearch\Pipeline;
93+
use MacFJA\RediSearch\Search;
94+
use MacFJA\RediSearch\Suggestions;
95+
use Predis\Client;
96+
97+
$client = new Client(/* ... */);
98+
$suggestion = new Suggestions($client);
99+
$pipeline = new Pipeline($client);
100+
$query = '@age:[(17 +inf] %john%';
101+
$search = new Search($client);
102+
$aggregate = new Aggregate($client);
103+
104+
$pipeline
105+
->addPipeable(
106+
$search->withIndex('people')->withQuery($query)
107+
)
108+
->addPipeable(
109+
$aggregate
110+
->withIndexName('people')
111+
->withQuery($query)
112+
->addGroupBy([], [
113+
Reducer::average('age', 'avg'),
114+
Reducer::maximum('age', 'oldest')
115+
])
116+
)
117+
->addPipeable(
118+
$aggregate
119+
->withIndexName('people')
120+
->withQuery($query)
121+
->addGroupBy(['lastname'], [Reducer::count('count')])
122+
)
123+
->addItem(
124+
$suggestion->pipeableGet('john', true)
125+
);
126+
$result = $pipeline->executePipeline();
127+
128+
// $result[0] is the search result
129+
// $result[1] is the first aggregation result
130+
// $result[2] is the second aggregation result
131+
// $result[3] is the suggestion result
132+
```
133+
87134
## Similar projects
88135

89136
- [ethanhann/redisearch-php](https://packagist.org/packages/ethanhann/redisearch-php) - Abandoned

src/Aggregate.php

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
use MacFJA\RediSearch\Aggregate\SortBy;
3535
use MacFJA\RediSearch\Helper\DataHelper;
3636
use MacFJA\RediSearch\Helper\PaginatedResult;
37+
use MacFJA\RediSearch\Helper\PipelineItem;
3738
use MacFJA\RediSearch\Helper\RedisHelper;
3839
use Predis\Client;
3940
use Throwable;
4041

41-
class Aggregate implements Builder
42+
class Aggregate implements Builder, Pipeable
4243
{
4344
/** @var Client */
4445
private $redis;
@@ -228,20 +229,33 @@ public function aggregate(): array
228229
*/
229230
public function execute()
230231
{
231-
$rawResult = $this->redis->executeRaw($this->buildQuery());
232+
$request = $this->asPipelineItem();
233+
$rawResult = $this->redis->executeCommand($request->getCommand());
232234

233-
$totalCount = array_shift($rawResult);
234-
assert(is_int($totalCount));
235-
236-
$items = array_map(function (array $document) {
237-
return new Result(RedisHelper::getPairs($document));
238-
}, $rawResult);
239-
240-
$result = new class($totalCount, $items, 0, 0) extends PaginatedResult {
241-
};
242-
$this->reset();
235+
return $request->transform($rawResult);
236+
}
243237

244-
return $result;
238+
public function asPipelineItem(): PipelineItem
239+
{
240+
try {
241+
return PipelineItem::createFromRaw(
242+
$this->buildQuery(),
243+
static function ($rawResult, array $context): PaginatedResult {
244+
$totalCount = array_shift($rawResult);
245+
assert(is_int($totalCount));
246+
247+
$items = array_map(function (array $document) {
248+
return new Result(RedisHelper::getPairs($document));
249+
}, $rawResult);
250+
251+
return new class($totalCount, $items, $context['offset'] ?? 0, $context['limit'] ?? 0) extends PaginatedResult {
252+
};
253+
},
254+
['offset' => $this->resultOffset, 'limit' => $this->resultLimit]
255+
);
256+
} finally {
257+
$this->reset();
258+
}
245259
}
246260

247261
/**

src/Helper/PipelineItem.php

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* Copyright MacFJA
7+
*
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
10+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
11+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
14+
* Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
17+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
namespace MacFJA\RediSearch\Helper;
23+
24+
use function call_user_func;
25+
use Predis\Command\CommandInterface;
26+
use Predis\Command\RawCommand;
27+
28+
class PipelineItem
29+
{
30+
/** @var CommandInterface */
31+
private $command;
32+
33+
/** @var array<mixed> */
34+
private $context;
35+
36+
/** @var callable */
37+
private $transformer;
38+
39+
/**
40+
* @param array<mixed> $context
41+
*/
42+
public function __construct(CommandInterface $command, callable $transformer, array $context = [])
43+
{
44+
$this->command = $command;
45+
$this->context = $context;
46+
$this->transformer = $transformer;
47+
}
48+
49+
/**
50+
* @param array<mixed> $params
51+
* @param array<mixed> $context
52+
*/
53+
public static function createFromRaw(array $params, callable $transformer, array $context = []): PipelineItem
54+
{
55+
return new self(new RawCommand($params), $transformer, $context);
56+
}
57+
58+
public function getCommand(): CommandInterface
59+
{
60+
return $this->command;
61+
}
62+
63+
/**
64+
* @return array<mixed>
65+
*/
66+
public function getContext(): array
67+
{
68+
return $this->context;
69+
}
70+
71+
public function getTransformer(): callable
72+
{
73+
return $this->transformer;
74+
}
75+
76+
/**
77+
* @param mixed $rawResult
78+
*
79+
* @return mixed
80+
*/
81+
public function transform($rawResult)
82+
{
83+
return call_user_func($this->transformer, $rawResult, $this->context);
84+
}
85+
}

src/Pipeable.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* Copyright MacFJA
7+
*
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
10+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
11+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
14+
* Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
17+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
namespace MacFJA\RediSearch;
23+
24+
use MacFJA\RediSearch\Helper\PipelineItem;
25+
26+
interface Pipeable
27+
{
28+
public function asPipelineItem(): PipelineItem;
29+
}

src/Pipeline.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
/*
6+
* Copyright MacFJA
7+
*
8+
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
9+
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
10+
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
11+
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
14+
* Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
17+
* WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
18+
* COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19+
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20+
*/
21+
22+
namespace MacFJA\RediSearch;
23+
24+
use function array_keys;
25+
use function array_map;
26+
use function assert;
27+
use function is_array;
28+
use MacFJA\RediSearch\Helper\PipelineItem;
29+
use Predis\Client;
30+
31+
class Pipeline
32+
{
33+
/** @var array<PipelineItem> */
34+
private $pipeline;
35+
36+
/** @var Client */
37+
private $redis;
38+
39+
public function __construct(Client $redis)
40+
{
41+
$this->redis = $redis;
42+
$this->pipeline = [];
43+
}
44+
45+
public function addItem(PipelineItem $pipelineItem): self
46+
{
47+
$this->pipeline[] = $pipelineItem;
48+
49+
return $this;
50+
}
51+
52+
public function addPipeable(Pipeable $pipeable): self
53+
{
54+
$this->pipeline[] = $pipeable->asPipelineItem();
55+
56+
return $this;
57+
}
58+
59+
/**
60+
* @return array<mixed>
61+
*/
62+
public function executePipeline(): array
63+
{
64+
$rawReplies = $this->redis->pipeline(function (\Predis\Pipeline\Pipeline $pipe) {
65+
foreach ($this->pipeline as $item) {
66+
$pipe->executeCommand($item->getCommand());
67+
}
68+
});
69+
assert(is_array($rawReplies));
70+
71+
try {
72+
return array_map(function ($reply, $index) {
73+
return $this->pipeline[$index]->transform($reply);
74+
}, $rawReplies, array_keys($rawReplies));
75+
} finally {
76+
$this->pipeline = [];
77+
}
78+
}
79+
}

0 commit comments

Comments
 (0)