Skip to content

Commit 697394a

Browse files
initial commit
1 parent 4117cab commit 697394a

16 files changed

+152
-40
lines changed

README.md

Lines changed: 98 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,28 @@
1-
# Redis Cluster Pipeline Library
1+
# Redis Cluster Pipeline Library with <img src="./images/pw.jpg" alt="drawing" width="30"/>
22

3-
## Overview
3+
This is a custom wrapper around the [ioredis](https://www.npmjs.com/package/ioredis) that adds a new method `clusterPipeline` to handle Redis Cluster pipelines more efficiently.
44

5-
This is a custom wrapper library around the popular ioredis client that adds a new method called `clusterPipeline` to handle Redis Cluster pipelines more efficiently. This library was developed by the PhysicsWallah Private Limited tech team to solve issues with executing multiple Redis commands across Redis Cluster nodes.
5+
## Why This Library?
6+
7+
### Problem at PhysicsWallah
8+
9+
At PhysicsWallah, we needed an efficient way to handle Redis Cluster commands when multiple Redis nodes were involved. The standard Redis pipeline approach didn't take slot distribution into account, leading to inefficiencies and complexity. We developed this library to:
10+
11+
- Simplify the execution of multiple Redis commands across a Redis Cluster.
12+
- Maintain the order of results while executing commands in parallel.
13+
- Make our Redis Cluster usage more efficient and less error-prone.
614

7-
## Problem Statement
15+
## Limitations of ioredis in Clustered Redis Topology
816

9-
While using Redis Cluster with the ioredis client, handling multiple commands in a way that correctly distributes them across nodes while maintaining the order of execution was challenging. To address this, we developed this library that adds the `clusterPipeline` method, making it easy to manage Redis commands in a Redis Cluster environment.
17+
As stated in the official [ioredis documentation](https://github.com/redis/ioredis?tab=readme-ov-file#transaction-and-pipeline-in-cluster-mode):
1018

11-
This library is simply a wrapper around ioredis, extending it with an additional method that enables Redis Cluster commands to be executed in parallel, with the results returned in the correct order.
19+
> "All keys in a pipeline should belong to slots served by the same node, since ioredis sends all commands in a pipeline to the same node."
20+
21+
This means that when executing multiple commands in a pipeline, all keys must belong to the same Redis slot, as `ioredis` sends the pipeline commands to a single node. If the keys interact with different shards, pipeline commands would fail with error `All the keys in a pipeline command should belong to the same slot`
22+
23+
Due to this limitation, pipelining cannot be reliably used in Redis clusters with ioredis, leading to increased network latency as each command is executed individually instead of being batched together.
24+
25+
Our custom library resolves this issue by enabling efficient and reliable pipelining support in clustered Redis environments, improving performance and reducing latency.
1226

1327
## Features
1428

@@ -22,40 +36,40 @@ This library is simply a wrapper around ioredis, extending it with an additional
2236
To install the library, use npm or yarn to add it to your project.
2337

2438
```bash
25-
npm install @pw-tech/ioredis
39+
npm install @pw-tech/pw-redis
2640
```
2741

2842
Or with yarn:
2943

3044
```bash
31-
yarn add @pw-tech/ioredis
45+
yarn add @pw-tech/pw-redis
3246
```
3347

3448
## Usage
3549

3650
After installing, you can use the Cluster class just as you would with ioredis, but with the added functionality of the `clusterPipeline` method for Redis Cluster commands.
51+
```javascript
52+
clusterPipeline(commands: [string, ...any][]): Promise<[error: Error | null, result: unknown][] | null>
53+
```
3754

38-
### Example
55+
##### Parameters:
3956

40-
1. Basic Redis Usage (Without Clusters)
57+
- `commands`: An array of Redis commands, where each command is represented as an array with the command name as the first element, followed by its respective arguments.
58+
- Example: [['set', 'key1', 'value1'], ['get', 'key1']].
4159

42-
```javascript
43-
import { Redis } from '@pw-tech/ioredis';
60+
##### Returns:
4461

45-
const redis = new Redis({
46-
host: 'localhost',
47-
port: 6379,
48-
});
62+
- A Promise that resolves to an array of results for each command, in the same order as the input.
63+
- Example: [ ['OK'], ['value1'] ].
4964

50-
redis.set('key1', 'value1').then(() => redis.get('key1')).then(console.log);
51-
```
65+
#### Examples
5266

53-
2. Redis Cluster Usage (With `clusterPipeline`)
67+
1. Cluster Redis Usage
5468

55-
In case you are using a Redis Cluster, you can now use the `clusterPipeline` method to handle pipelines across multiple nodes.
69+
When working with Redis Cluster, you can use the clusterPipeline method to efficiently handle multiple commands across different nodes.
5670

5771
```javascript
58-
import { Cluster } from '@pw-tech/ioredis';
72+
import { Cluster } from '@pw-tech/pw-redis';
5973

6074
const clusterRedis = new Cluster({
6175
redisOptions: [{ host: 'localhost', port: 6379 }],
@@ -66,19 +80,40 @@ const commands = [
6680
['get', 'key1'],
6781
];
6882

69-
clusterRedis.clusterPipeline(commands).then(results => {
70-
console.log(results); // Output: [ ['OK'], ['value1'] ]
83+
clusterRedis.clusterPipeline(commands)
84+
```
85+
86+
And the output will be:
87+
88+
```javascript
89+
[[null,"OK"],[null,"value1"]]
90+
```
91+
92+
93+
2. Normal Usage
94+
95+
```javascript
96+
import { Redis } from '@pw-tech/pw-redis';
97+
98+
const redis = new Redis({
99+
host: 'localhost',
100+
port: 6379,
71101
});
102+
103+
redis.set('key1', 'value1').then(() => redis.get('key1')).then(console.log);
72104
```
73105

74106
## Explanation of `clusterPipeline`
75107

76-
`clusterPipeline` takes an array of Redis commands (e.g., [['set', 'key', 'value'], ['get', 'key']]).
77-
It automatically divides the commands across the Redis Cluster nodes based on the slot of the key.
78-
It executes the commands in parallel and ensures the results are returned in the same order as the original commands.
108+
The `clusterPipeline` method takes an array of Redis commands (e.g., [['set', 'key', 'value'], ['get', 'key']]) and automatically distributes them across the appropriate Redis Cluster nodes based on key slots. It executes the commands in parallel, ensuring that the results are returned in the same order as the input commands.
79109

80110
## How it Works
81111

112+
- **Slot Calculation**: Redis Cluster distributes keys across multiple nodes using hash slots. The library automatically determines the correct node for each key and groups commands accordingly, using the Redis `SHARDS` method to map and store the slot-to-node mapping in memory. In case of any slot changes in Redis, detected through error handling, the slot ranges are refreshed dynamically.
113+
- **Pipeline Execution**: Once the commands are grouped by node, they are executed in parallel, improving performance when processing large sets of commands.
114+
- **Result Handling**: The results from each node are collected and returned in the same order as the original set of commands, ensuring consistency and ease of use.
115+
116+
82117
- **Slot Calculation**: Redis Cluster splits keys across multiple nodes using slots. This library automatically calculates which node each key belongs to and groups the commands accordingly.
83118
- **Pipeline Execution**: Once the commands are grouped by node, they are executed in parallel on each node, ensuring better performance when dealing with large pipelines.
84119
- **Result Handling**: Results from each node are merged and returned in the same order as the original commands.
@@ -115,6 +150,43 @@ At PhysicsWallah Private Limited, we needed an efficient way to handle Redis Clu
115150

116151
By adding the `clusterPipeline` method, we ensure commands are distributed to the appropriate Redis node, executed in parallel, and results are returned in the correct order. This approach eliminates the need to manually manage slot calculations and node selection.
117152

153+
## Performance Improvement in Production
154+
155+
After implementing partial pipeline optimization, we observed significant API response time improvements in production. Some queries had to remain sequential due to business logic dependencies, but wherever possible, parallel and pipeline execution were combined.
156+
157+
### API 1
158+
159+
| Date | Avg Latency | P95 Latency | P99 Latency |
160+
|------------|-------------|-------------|-------------|
161+
| Without Pipeline | 52ms | 119ms | 264ms |
162+
| With Pipeline | 42ms (↓19%) | 101ms (↓15%)| 224ms (↓15%)|
163+
164+
#### Latency Comparison
165+
![API 1 Avg Latency Comparison](./images/api1_avg_latency.png)
166+
167+
![API 1 P95 Latency Comparison](./images/api1_p95_latency.png)
168+
169+
![API 1 P99 Latency Comparison](./images/api1_p99_latency.png)
170+
171+
### API 2
172+
173+
| Date | Avg Latency | P95 Latency | P99 Latency |
174+
|------------|-------------|-------------|-------------|
175+
| Without Pipeline | 36ms | 78ms | 234ms |
176+
| With Pipeline | 31ms (↓14%) | 57ms (↓27%) | 202ms (↓13%)|
177+
178+
#### Latency Comparison
179+
![API 2 Avg Latency Comparison](./images/api2_avg_latency.png)
180+
181+
![API 2 P95 Latency Comparison](./images/api2_p95_latency.png)
182+
183+
![API 2 P99 Latency Comparison](./images/api2_p99_latency.png)
184+
185+
## Key Learnings & Next Steps
186+
- **Pipeline execution improved response times** significantly in cluster mode.
187+
- **Sequential logic in APIs limits optimization**, we will continue identifying more queries for batching.
188+
- **Next Steps**: Expanding this approach across more APIs & optimizing Redis cluster configurations.
189+
118190
## License
119191

120192
This library is open-source and licensed under the MIT License.

dist/clusterPipeline.d.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import Redis, { Cluster } from 'ioredis';
22
declare class ExtendedClusterRedis extends Cluster {
3-
clusterPipeline(commands: [string, ...any][]): Promise<any[]>;
3+
private nodeSlotRanges;
4+
private updateRedisClusterSlots;
5+
clusterPipeline(commands: [string, ...any][]): Promise<[error: Error | null, result: unknown][] | null>;
6+
private executePipelineForCluster;
47
private calculateSlot;
58
private hashCRC16;
69
private findNodeForSlot;

dist/clusterPipeline.js

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ exports.Cluster = exports.Redis = void 0;
44
const ioredis_1 = require("ioredis");
55
exports.Redis = ioredis_1.default;
66
class ExtendedClusterRedis extends ioredis_1.Cluster {
7-
async clusterPipeline(commands) {
8-
const pipelinesByNode = {};
7+
constructor() {
8+
super(...arguments);
9+
this.nodeSlotRanges = [];
10+
}
11+
async updateRedisClusterSlots() {
912
const clusterSlots = await this.cluster('SHARDS');
10-
const nodeSlotRanges = clusterSlots.flatMap(([, slotRanges, , [node]]) => {
13+
this.nodeSlotRanges = clusterSlots.flatMap(([, slotRanges, , [node]]) => {
1114
// slotRanges can contain multiple start-end pairs
1215
const ranges = [];
1316
for (let i = 0; i < slotRanges.length; i += 2) {
@@ -19,12 +22,28 @@ class ExtendedClusterRedis extends ioredis_1.Cluster {
1922
}
2023
return ranges;
2124
});
25+
}
26+
async clusterPipeline(commands) {
27+
try {
28+
let res = await this.executePipelineForCluster(commands);
29+
return res;
30+
}
31+
catch (error) {
32+
if (error instanceof Error && error.message.includes("slots")) {
33+
await this.updateRedisClusterSlots();
34+
return this.executePipelineForCluster(commands);
35+
}
36+
throw error;
37+
}
38+
}
39+
async executePipelineForCluster(commands) {
40+
const pipelinesByNode = {};
2241
// Group commands by node
2342
for (let i = 0; i < commands.length; i++) {
2443
const command = commands[i];
2544
const key = command[1];
2645
const slot = this.calculateSlot(key);
27-
const node = this.findNodeForSlot(nodeSlotRanges, slot);
46+
const node = this.findNodeForSlot(this.nodeSlotRanges, slot);
2847
if (!pipelinesByNode[node]) {
2948
pipelinesByNode[node] = { commands: [], originalIndices: [] };
3049
}
@@ -33,7 +52,7 @@ class ExtendedClusterRedis extends ioredis_1.Cluster {
3352
}
3453
// Execute pipelines per node
3554
const results = [];
36-
for (const node in pipelinesByNode) {
55+
const promises = Object.keys(pipelinesByNode).map(async (node) => {
3756
const { commands, originalIndices } = pipelinesByNode[node];
3857
const pipeline = this.pipeline();
3958
commands.forEach(cmd => pipeline[cmd[0]](...cmd.slice(1)));
@@ -42,7 +61,8 @@ class ExtendedClusterRedis extends ioredis_1.Cluster {
4261
const originalIndex = originalIndices[localIndex];
4362
results[originalIndex] = result;
4463
});
45-
}
64+
});
65+
await Promise.all(promises);
4666
return results;
4767
}
4868
calculateSlot(key) {

images/Physics_wallah-2.webp

10.8 KB
Binary file not shown.

images/api1_avg_latency.png

422 KB
Loading

images/api1_p95_latency.png

250 KB
Loading

images/api1_p99_latency.png

290 KB
Loading

images/api1_tpm.png

352 KB
Loading

images/api2_avg_latency.png

414 KB
Loading

images/api2_p95_latency.png

464 KB
Loading

0 commit comments

Comments
 (0)