Skip to content

Commit 35b4fa1

Browse files
Initial commit
0 parents  commit 35b4fa1

File tree

11 files changed

+729
-0
lines changed

11 files changed

+729
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/.idea
2+
/.vscode
3+
.env
4+
/coverage/
5+
/node_modules/

README.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# Redis Cluster Pipeline Library
2+
3+
## Overview
4+
5+
This is a custom wrapper library around the popular ioredis client that adds a new method called `clusterPipeline` to handle Redis Cluster pipelines more efficiently. This library was developed by the PhysicsWallah Private Limited tech team to solve issues with executing multiple Redis commands across Redis Cluster nodes.
6+
7+
## Problem Statement
8+
9+
While using Redis Cluster with the ioredis client, handling multiple commands in a way that correctly distributes them across nodes while maintaining the order of execution was challenging. To address this, we developed this library that adds the `clusterPipeline` method, making it easy to manage Redis commands in a Redis Cluster environment.
10+
11+
This library is simply a wrapper around ioredis, extending it with an additional method that enables Redis Cluster commands to be executed in parallel, with the results returned in the correct order.
12+
13+
## Features
14+
15+
- **Efficient Redis Cluster Pipeline**: Adds support for executing commands across multiple Redis Cluster nodes using `clusterPipeline`.
16+
- **Seamless Integration**: Works with the existing ioredis client without requiring any changes to your existing Redis configuration.
17+
- **Cluster-Aware**: Automatically handles Redis commands based on slots, distributing them across the appropriate Redis Cluster node.
18+
- **Backward Compatible**: All existing ioredis functionality is preserved; the `clusterPipeline` method is simply added to the Cluster class.
19+
20+
## Installation
21+
22+
To install the library, use npm or yarn to add it to your project.
23+
24+
```bash
25+
npm install @pw-tech/ioredis
26+
```
27+
28+
Or with yarn:
29+
30+
```bash
31+
yarn add @pw-tech/ioredis
32+
```
33+
34+
## Usage
35+
36+
After installing, you can use the Cluster class just as you would with ioredis, but with the added functionality of the `clusterPipeline` method for Redis Cluster commands.
37+
38+
### Example
39+
40+
1. Basic Redis Usage (Without Clusters)
41+
42+
```javascript
43+
import { Redis } from '@pw-tech/ioredis';
44+
45+
const redis = new Redis({
46+
host: 'localhost',
47+
port: 6379,
48+
});
49+
50+
redis.set('key1', 'value1').then(() => redis.get('key1')).then(console.log);
51+
```
52+
53+
2. Redis Cluster Usage (With `clusterPipeline`)
54+
55+
In case you are using a Redis Cluster, you can now use the `clusterPipeline` method to handle pipelines across multiple nodes.
56+
57+
```javascript
58+
import { Cluster } from '@pw-tech/ioredis';
59+
60+
const clusterRedis = new Cluster({
61+
redisOptions: [{ host: 'localhost', port: 6379 }],
62+
});
63+
64+
const commands = [
65+
['set', 'key1', 'value1'],
66+
['get', 'key1'],
67+
];
68+
69+
clusterRedis.clusterPipeline(commands).then(results => {
70+
console.log(results); // Output: [ ['OK'], ['value1'] ]
71+
});
72+
```
73+
74+
## Explanation of `clusterPipeline`
75+
76+
`clusterPipeline` takes an array of Redis commands (e.g., [['set', 'key', 'value'], ['get', 'key']]).
77+
It automatically divides the commands across the Redis Cluster nodes based on the slot of the key.
78+
It executes the commands in parallel and ensures the results are returned in the same order as the original commands.
79+
80+
## How it Works
81+
82+
- **Slot Calculation**: Redis Cluster splits keys across multiple nodes using slots. This library automatically calculates which node each key belongs to and groups the commands accordingly.
83+
- **Pipeline Execution**: Once the commands are grouped by node, they are executed in parallel on each node, ensuring better performance when dealing with large pipelines.
84+
- **Result Handling**: Results from each node are merged and returned in the same order as the original commands.
85+
86+
## API Documentation
87+
88+
`clusterPipeline(commands: [string, ...any][]): Promise<any[]>`
89+
90+
### Parameters:
91+
92+
- `commands`: An array of Redis commands, where each command is an array starting with the command name and followed by its arguments.
93+
- Example: [['set', 'key1', 'value1'], ['get', 'key1']].
94+
95+
### Returns:
96+
97+
- A Promise that resolves to an array of command results in the same order as they were provided.
98+
- Example: [ ['OK'], ['value1'] ].
99+
100+
## Redis and Cluster
101+
102+
All the normal ioredis Redis and Cluster functionality remains available. This library only adds the `clusterPipeline` method to Cluster.
103+
104+
## Why This Library?
105+
106+
### Problem at PhysicsWallah Private Limited
107+
108+
At PhysicsWallah Private Limited, we needed an efficient way to handle Redis Cluster commands when multiple Redis nodes were involved. The standard Redis pipeline approach didn’t take slot distribution into account, leading to inefficiencies and complexity. We developed this library to:
109+
110+
- Simplify the execution of multiple Redis commands across a Redis Cluster.
111+
- Maintain the order of results while executing commands in parallel.
112+
- Make our Redis Cluster usage more efficient and less error-prone.
113+
114+
### How We Solve It
115+
116+
By adding the `clusterPipeline` method, we ensure commands are distributed to the appropriate Redis node, executed in parallel, and results are returned in the correct order. This approach eliminates the need to manually manage slot calculations and node selection.
117+
118+
## License
119+
120+
This library is open-source and licensed under the MIT License.
121+
122+
## Contributing
123+
124+
If you have any improvements or bug fixes, feel free to submit a pull request or open an issue.

dist/clusterPipeline.d.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import Redis, { Cluster } from 'ioredis';
2+
declare class ExtendedClusterRedis extends Cluster {
3+
clusterPipeline(commands: [string, ...any][]): Promise<any[]>;
4+
private calculateSlot;
5+
private hashCRC16;
6+
private findNodeForSlot;
7+
}
8+
export { Redis, ExtendedClusterRedis as Cluster };

dist/clusterPipeline.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"use strict";
2+
Object.defineProperty(exports, "__esModule", { value: true });
3+
exports.Cluster = exports.Redis = void 0;
4+
const ioredis_1 = require("ioredis");
5+
exports.Redis = ioredis_1.default;
6+
class ExtendedClusterRedis extends ioredis_1.Cluster {
7+
async clusterPipeline(commands) {
8+
const pipelinesByNode = {};
9+
const clusterSlots = await this.cluster('SHARDS');
10+
const nodeSlotRanges = clusterSlots.flatMap(([, slotRanges, , [node]]) => {
11+
// slotRanges can contain multiple start-end pairs
12+
const ranges = [];
13+
for (let i = 0; i < slotRanges.length; i += 2) {
14+
ranges.push({
15+
node: node[1],
16+
startSlot: slotRanges[i],
17+
endSlot: slotRanges[i + 1],
18+
});
19+
}
20+
return ranges;
21+
});
22+
// Group commands by node
23+
for (let i = 0; i < commands.length; i++) {
24+
const command = commands[i];
25+
const key = command[1];
26+
const slot = this.calculateSlot(key);
27+
const node = this.findNodeForSlot(nodeSlotRanges, slot);
28+
if (!pipelinesByNode[node]) {
29+
pipelinesByNode[node] = { commands: [], originalIndices: [] };
30+
}
31+
pipelinesByNode[node].commands.push(command);
32+
pipelinesByNode[node].originalIndices.push(i);
33+
}
34+
// Execute pipelines per node
35+
const results = [];
36+
for (const node in pipelinesByNode) {
37+
const { commands, originalIndices } = pipelinesByNode[node];
38+
const pipeline = this.pipeline();
39+
commands.forEach(cmd => pipeline[cmd[0]](...cmd.slice(1)));
40+
const nodeResult = await pipeline.exec();
41+
nodeResult.forEach((result, localIndex) => {
42+
const originalIndex = originalIndices[localIndex];
43+
results[originalIndex] = result;
44+
});
45+
}
46+
return results;
47+
}
48+
calculateSlot(key) {
49+
if (key == null) {
50+
return 0;
51+
}
52+
key = String(key);
53+
// Handle hash tag (keys inside {})
54+
const hashTagMatch = key.match(/\{(.+?)\}/);
55+
if (hashTagMatch) {
56+
key = hashTagMatch[1];
57+
}
58+
if (key.trim().length === 0) {
59+
return 0;
60+
}
61+
return this.hashCRC16(key) % 16384;
62+
}
63+
hashCRC16(str) {
64+
let crc = 0;
65+
const polynomial = 0x1021;
66+
const buffer = Buffer.from(str, 'utf8');
67+
for (let byte of buffer) {
68+
crc ^= byte << 8;
69+
for (let i = 0; i < 8; i++) {
70+
if (crc & 0x8000) {
71+
crc = (crc << 1) ^ polynomial;
72+
}
73+
else {
74+
crc <<= 1;
75+
}
76+
crc &= 0xFFFF;
77+
}
78+
}
79+
return crc;
80+
}
81+
findNodeForSlot(nodeSlotRanges, slot) {
82+
const matchingRange = nodeSlotRanges.find(range => slot >= range.startSlot && slot <= range.endSlot);
83+
return matchingRange ? matchingRange.node : null;
84+
}
85+
}
86+
exports.Cluster = ExtendedClusterRedis;

dist/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from './clusterPipeline';

dist/index.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"use strict";
2+
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
3+
if (k2 === undefined) k2 = k;
4+
var desc = Object.getOwnPropertyDescriptor(m, k);
5+
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
6+
desc = { enumerable: true, get: function() { return m[k]; } };
7+
}
8+
Object.defineProperty(o, k2, desc);
9+
}) : (function(o, m, k, k2) {
10+
if (k2 === undefined) k2 = k;
11+
o[k2] = m[k];
12+
}));
13+
var __exportStar = (this && this.__exportStar) || function(m, exports) {
14+
for (var p in m) if (p !== "default" && !Object.prototype.hasOwnProperty.call(exports, p)) __createBinding(exports, m, p);
15+
};
16+
Object.defineProperty(exports, "__esModule", { value: true });
17+
__exportStar(require("./clusterPipeline"), exports);

0 commit comments

Comments
 (0)