Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5d84468
handle smigrating
nkaradzhov Nov 11, 2025
eab86c1
first approximation to handling smigrated
nkaradzhov Nov 13, 2025
fc06af6
deduplicate notifications based on sequence id
nkaradzhov Dec 2, 2025
44e4f02
add slotnumber to commands
nkaradzhov Jan 29, 2026
aa56883
add support for extracting commands from queue
nkaradzhov Dec 1, 2025
6e39e00
parse notification
nkaradzhov Dec 1, 2025
bb2a8cb
work on main algo
nkaradzhov Dec 2, 2025
71dbdc4
fix: handle string values in push message reply comparison
nkaradzhov Dec 2, 2025
3d55ca4
parse SMIGRATED according to new format
nkaradzhov Dec 2, 2025
a5d1b2c
comply with the new notification structure
nkaradzhov Dec 2, 2025
aa032d4
refine algo
nkaradzhov Dec 2, 2025
5e13dc4
handle pubSubNode replacement
nkaradzhov Dec 3, 2025
22b2050
tests: merge all `after` functions into one
nkaradzhov Dec 5, 2025
14fefb7
tests: add `testWithProxiedCluster()` function
nkaradzhov Jan 29, 2026
6d73f0e
Update index.ts
nkaradzhov Jan 29, 2026
3ee16fd
tests: add ProxyController for easier proxy comms
nkaradzhov Dec 5, 2025
bcbf3fb
fix: access private queue through _self proxy and guard client close …
PavelPashov Dec 10, 2025
013da8e
test(cluster): add fault injector infrastructure for hitless upgrade …
PavelPashov Dec 10, 2025
74c5144
feat(test-utils): add RE database management and test utilities
nkaradzhov Jan 29, 2026
11e8ae6
fix: fix command queue extraction and prepend logic
PavelPashov Dec 17, 2025
a7e76cf
test: add slot migration tests and refactor proxied fault injector
PavelPashov Dec 18, 2025
1a92c43
fix: wait for ALL ports while spawning proxied redis
nkaradzhov Jan 7, 2026
9ef9fce
fix: handle partial PubSubListeners in resubscribeAllPubSubListeners
nkaradzhov Jan 7, 2026
e19f5c1
refactor: maintenance tests and enhance fault injector client
nkaradzhov Jan 29, 2026
b547cb6
refactor: improve SMIGRATED push message parsing and add comprehensiv…
nkaradzhov Jan 29, 2026
4e8e571
refactor: #handleSmigrated: move source cleanup outside destinations …
nkaradzhov Jan 29, 2026
fd66749
refactor: add error handling to #handleSmigrated with try-catch-finally
nkaradzhov Jan 29, 2026
34835d1
refactor: replace hardcoded node ID 'asdff' with meaningful smigrated…
nkaradzhov Jan 29, 2026
e3589d7
fix: merge conflict residuals
nkaradzhov Jan 30, 2026
b9d332e
refactor: remove extra db deletion
nkaradzhov Feb 2, 2026
41af3ef
test: iterate over all trigger requirements and improve test naming
nkaradzhov Feb 3, 2026
85cde4f
uncomment tests
nkaradzhov Feb 3, 2026
d0a40e7
test: refactor test naming to use single baseTestName variable with i…
nkaradzhov Feb 3, 2026
e4de041
remove debug logs
nkaradzhov Feb 4, 2026
27d153f
fix: prevent PubSub subscription loss during cluster maintenance
nkaradzhov Feb 4, 2026
65c4323
Fix PubSub test hangs by awaiting publish batches
nkaradzhov Feb 4, 2026
491f5d6
Fix slot migration hangs during SMIGRATED handling
nkaradzhov Feb 4, 2026
5b573c9
improve FI debug logs
nkaradzhov Feb 6, 2026
e23cff1
implement unrelaxation
nkaradzhov Feb 6, 2026
1852290
chore: delete temp arch files
nkaradzhov Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ node_modules/
dump.rdb
documentation/
tsconfig.tsbuildinfo
*.log
34 changes: 17 additions & 17 deletions packages/client/lib/RESP/decoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class Decoder {
this.#next = undefined;
return this.#decodeTypeValue(type, chunk);
}

#decodeTypeValue(type, chunk) {
switch (type) {
case RESP_TYPES.NULL:
Expand Down Expand Up @@ -128,7 +128,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.DOUBLE:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -137,7 +137,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.SIMPLE_STRING:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -146,7 +146,7 @@ export class Decoder {
chunk
)
);

case RESP_TYPES.BLOB_STRING:
return this.#handleDecodedValue(
this.onReply,
Expand All @@ -170,7 +170,7 @@ export class Decoder {
this.onErrorReply,
this.#decodeSimpleError(chunk)
);

case RESP_TYPES.BLOB_ERROR:
return this.#handleDecodedValue(
this.onErrorReply,
Expand All @@ -188,7 +188,7 @@ export class Decoder {
this.onReply,
this.#decodeSet(this.getTypeMapping(), chunk)
);

case RESP_TYPES.MAP:
return this.#handleDecodedValue(
this.onReply,
Expand Down Expand Up @@ -421,17 +421,17 @@ export class Decoder {
return this.#cursor === chunk.length ?
this.#decodeDoubleExponent.bind(this, d) :
this.#decodeDoubleExponent(d, chunk);

case ASCII['\r']:
this.#cursor = cursor + 2; // skip \r\n
return isNegative ? -double : double;
}

if (decimalIndex < Decoder.#DOUBLE_DECIMAL_MULTIPLIERS.length) {
double += (byte - ASCII['0']) * Decoder.#DOUBLE_DECIMAL_MULTIPLIERS[decimalIndex++];
}
} while (++cursor < chunk.length);

this.#cursor = cursor;
return this.#decodeDoubleDecimal.bind(this, isNegative, decimalIndex, double);
}
Expand Down Expand Up @@ -613,7 +613,7 @@ export class Decoder {
}

#decodeVerbatimStringFormat(stringLength, chunk) {
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
const formatCb = this.#decodeStringWithLength.bind(this, 3, 1, String);
return this.#cursor >= chunk.length ?
this.#continueDecodeVerbatimStringFormat.bind(this, stringLength, formatCb) :
this.#continueDecodeVerbatimStringFormat(stringLength, formatCb, chunk);
Expand Down Expand Up @@ -689,13 +689,13 @@ export class Decoder {

case RESP_TYPES.BIG_NUMBER:
return this.#decodeBigNumber(typeMapping[RESP_TYPES.BIG_NUMBER], chunk);

case RESP_TYPES.DOUBLE:
return this.#decodeDouble(typeMapping[RESP_TYPES.DOUBLE], chunk);

case RESP_TYPES.SIMPLE_STRING:
return this.#decodeSimpleString(typeMapping[RESP_TYPES.SIMPLE_STRING], chunk);

case RESP_TYPES.BLOB_STRING:
return this.#decodeBlobString(typeMapping[RESP_TYPES.BLOB_STRING], chunk);

Expand All @@ -704,7 +704,7 @@ export class Decoder {

case RESP_TYPES.SIMPLE_ERROR:
return this.#decodeSimpleError(chunk);

case RESP_TYPES.BLOB_ERROR:
return this.#decodeBlobError(chunk);

Expand All @@ -713,7 +713,7 @@ export class Decoder {

case RESP_TYPES.SET:
return this.#decodeSet(typeMapping, chunk);

case RESP_TYPES.MAP:
return this.#decodeMap(typeMapping, chunk);

Expand Down Expand Up @@ -997,7 +997,7 @@ export class Decoder {
// decode simple string map key as string (and not as buffer)
case RESP_TYPES.SIMPLE_STRING:
return this.#decodeSimpleString(String, chunk);

// decode blob string map key as string (and not as buffer)
case RESP_TYPES.BLOB_STRING:
return this.#decodeBlobString(String, chunk);
Expand Down Expand Up @@ -1028,7 +1028,7 @@ export class Decoder {
this.#decodeNestedType.bind(this, typeMapping),
typeMapping
);
}
}

const value = this.#decodeNestedType(typeMapping, chunk);
if (typeof value === 'function') {
Expand Down
88 changes: 85 additions & 3 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ export interface CommandOptions<T = TypeMapping> {
* Timeout for the command in milliseconds
*/
timeout?: number;
/**
* @internal
* The slot the command is targeted to (if any)
*/
slotNumber?: number;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep slotNumber in the CommandOptions or refactor it?

}

export interface CommandToWrite extends CommandWaitingForReply {
Expand All @@ -33,6 +38,7 @@ export interface CommandToWrite extends CommandWaitingForReply {
listener: () => unknown;
originalTimeout: number | undefined;
} | undefined;
slotNumber?: number
}

interface CommandWaitingForReply {
Expand Down Expand Up @@ -186,14 +192,34 @@ export default class RedisCommandsQueue {
this.#pushHandlers.push(handler);
}

async waitForInflightCommandsToComplete(): Promise<void> {
async waitForInflightCommandsToComplete(options?: { timeoutMs?: number, flushOnTimeout?: boolean }): Promise<void> {
// In-flight commands already completed
if(this.#waitingForReply.length === 0) {
return
};
// Otherwise wait for in-flight commands to fire `empty` event
return new Promise(resolve => {
this.#waitingForReply.events.on('empty', resolve)
const onEmpty = () => {
if (timeoutId) clearTimeout(timeoutId);
resolve();
};

let timeoutId: ReturnType<typeof setTimeout> | undefined;
const timeoutMs = options?.timeoutMs;
if (timeoutMs !== undefined && timeoutMs > 0) {
timeoutId = setTimeout(() => {
this.#waitingForReply.events.off('empty', onEmpty);
const pendingCount = this.#waitingForReply.length;
dbgMaintenance(`waitForInflightCommandsToComplete timed out after ${timeoutMs}ms with ${pendingCount} commands still waiting`);
if (options?.flushOnTimeout && pendingCount > 0) {
dbgMaintenance(`Flushing ${pendingCount} commands that timed out waiting for reply`);
this.#flushWaitingForReply(new TimeoutError());
}
resolve(); // Resolve instead of reject - we don't want to fail the migration
}, timeoutMs);
}

this.#waitingForReply.events.once('empty', onEmpty);
});
}

Expand All @@ -219,6 +245,7 @@ export default class RedisCommandsQueue {
channelsCounter: undefined,
typeMapping: options?.typeMapping
};
value.slotNumber = options?.slotNumber

// If #maintenanceCommandTimeout was explicitly set, we should
// use it instead of the timeout provided by the command
Expand Down Expand Up @@ -283,7 +310,8 @@ export default class RedisCommandsQueue {
if (Array.isArray(reply)) {
if (this.#onPush(reply)) return;

if (PONG.equals(reply[0] as Buffer)) {
const firstElement = typeof reply[0] === 'string' ? Buffer.from(reply[0]) : reply[0];
if (PONG.equals(firstElement as Buffer)) {
const { resolve, typeMapping } = this.#waitingForReply.shift()!,
buffer = ((reply[1] as Buffer).length === 0 ? reply[0] : reply[1]) as Buffer;
resolve(typeMapping?.[RESP_TYPES.SIMPLE_STRING] === Buffer ? buffer : buffer.toString());
Expand Down Expand Up @@ -342,6 +370,10 @@ export default class RedisCommandsQueue {
return this.#pubSub.removeAllListeners();
}

removeShardedPubSubListenersForSlots(slots: Set<number>) {
return this.#pubSub.removeShardedPubSubListenersForSlots(slots);
}

resubscribe(chainId?: symbol) {
const commands = this.#pubSub.resubscribe();
if (!commands.length) return;
Expand Down Expand Up @@ -541,4 +573,54 @@ export default class RedisCommandsQueue {
this.#waitingForReply.length === 0
);
}

/**
*
* Extracts commands for the given slots from the toWrite queue.
* Some commands dont have "slotNumber", which means they are not designated to particular slot/node.
* We ignore those.
*/
extractCommandsForSlots(slots: Set<number>): CommandToWrite[] {
const result: CommandToWrite[] = [];
let current = this.#toWrite.head;
while(current !== undefined) {
if(current.value.slotNumber !== undefined && slots.has(current.value.slotNumber)) {
result.push(current.value);
const toRemove = current;
current = current.next;
this.#toWrite.remove(toRemove);
} else {
// Move to next node even if we don't extract this command
current = current.next;
}
}
return result;
}

/**
* Gets all commands from the write queue without removing them.
*/
extractAllCommands(): CommandToWrite[] {
const result: CommandToWrite[] = [];
let current = this.#toWrite.head;
while(current) {
result.push(current.value);
this.#toWrite.remove(current);
current = current.next;
}
return result;
}

/**
* Prepends commands to the write queue in reverse.
*/
prependCommandsToWrite(commands: CommandToWrite[]) {
if (!commands.length) {
return;
}

for (let i = commands.length - 1; i >= 0; i--) {
this.#toWrite.unshift(commands[i]);
}
}
}
Loading
Loading