Skip to content

Commit e846d9e

Browse files
committed
Prioritize replication reads to reduce full syncs
Adds Replication flow control (repl-flow-control) to adjust replication read frequency based on buffer pressure. Helps replicas keep up with replication data and reduces primary buffer utilization and overflows. - Dynamic replication read scaling based on buffer pressure. - Reduces full syncs by increasing replication reads when needed. - Improves replication responsiveness, reduces data staleness. - Trade-offs: Slightly higher client latency due to replication prioritization. Replication was handled like a normal client. Under high load in the replica, replication lag increased, making data stale and caused primary buffer overflows, triggering full syncs and high CPU/memory/I/O usage. - Fewer full syncs from buffer overruns. - Lower replication lag, fresher data on replicas. - More stable primary buffer usage, less swapping. - Slightly higher client latency due to replication prioritization. Signed-off-by: xbasel <[email protected]>
1 parent bcd2f95 commit e846d9e

File tree

5 files changed

+91
-5
lines changed

5 files changed

+91
-5
lines changed

src/config.c

+10
Original file line numberDiff line numberDiff line change
@@ -2496,6 +2496,14 @@ static int updateReplBacklogSize(const char **err) {
24962496
return 1;
24972497
}
24982498

2499+
static int updateReplFlowControl(const char **err) {
2500+
UNUSED(err);
2501+
if (server.repl_flow_control_enabled == 0) {
2502+
server.repl_cur_reads_per_io_event = 1;
2503+
}
2504+
return 1;
2505+
}
2506+
24992507
static int updateMaxmemory(const char **err) {
25002508
UNUSED(err);
25012509
if (server.maxmemory) {
@@ -3193,6 +3201,7 @@ standardConfig static_configs[] = {
31933201
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
31943202
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
31953203
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
3204+
createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, updateReplFlowControl),
31963205

31973206
/* String Configs */
31983207
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
@@ -3295,6 +3304,7 @@ standardConfig static_configs[] = {
32953304
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
32963305
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
32973306
createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL),
3307+
createIntConfig("repl-max-reads-per-io-event", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_max_reads_per_io_event, 25, INTEGER_CONFIG, NULL, NULL),
32983308

32993309
/* Unsigned int configs */
33003310
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),

src/networking.c

+54-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include <math.h>
4444
#include <ctype.h>
4545
#include <stdatomic.h>
46+
#include <stdbool.h>
4647

4748
/* This struct is used to encapsulate filtering criteria for operations on clients
4849
* such as identifying specific clients to kill or retrieve. Each field in the struct
@@ -3247,6 +3248,7 @@ void readToQueryBuf(client *c) {
32473248
if (c->nread <= 0) {
32483249
return;
32493250
}
3251+
c->qb_full_read = (size_t)c->nread == readlen ? 1 : 0;
32503252

32513253
sdsIncrLen(c->querybuf, c->nread);
32523254
qblen = sdslen(c->querybuf);
@@ -3264,19 +3266,66 @@ void readToQueryBuf(client *c) {
32643266
}
32653267
}
32663268

3269+
/**
3270+
* This function is designed to prioritize replication flow.
3271+
* Determines whether the replica should continue reading from the primary.
3272+
* It dynamically adjusts the read rate based on buffer utilization
3273+
* and ensures replication reads are not overly aggressive.
3274+
*
3275+
* @return 1 if another read should be attempted, 0 otherwise.
3276+
*/
3277+
int shouldRepeatRead(client *c, int iteration) {
3278+
// If the client is not a primary replica, is closing, or flow control is disabled, no more reads.
3279+
if (!(c->flag.primary) || c->flag.close_asap || !server.repl_flow_control_enabled) {
3280+
return 0;
3281+
}
3282+
3283+
bool is_last_iteration = iteration >= server.repl_cur_reads_per_io_event;
3284+
3285+
if (is_last_iteration) {
3286+
/* If the last read filled the buffer AND enough time has passed since the last increase:
3287+
* - Increase the read rate, up to a max limit.
3288+
* - This ensures a gradual ramp-up instead of an overly aggressive approach. */
3289+
if (c->qb_full_read && server.mstime - server.repl_last_rate_update > 100) {
3290+
server.repl_cur_reads_per_io_event = MIN(server.repl_max_reads_per_io_event,
3291+
server.repl_cur_reads_per_io_event + 1);
3292+
server.repl_last_rate_update = server.mstime; // Update the last increase timestamp.
3293+
}
3294+
} else {
3295+
/* If the last read completely filled the buffer, continue reading. */
3296+
if (c->qb_full_read) {
3297+
return 1;
3298+
}
3299+
3300+
/* If the buffer was NOT fully filled, it indicates less replication pressure.
3301+
* Reduce the read rate to avoid excessive polling and free up resources for other clients. */
3302+
server.repl_cur_reads_per_io_event = MAX(1, server.repl_cur_reads_per_io_event - 1);
3303+
}
3304+
3305+
/* Stop reading for now (if we reached this point, conditions to continue were not met). */
3306+
return 0;
3307+
}
3308+
3309+
32673310
void readQueryFromClient(connection *conn) {
32683311
client *c = connGetPrivateData(conn);
32693312
/* Check if we can send the client to be handled by the IO-thread */
32703313
if (postponeClientRead(c)) return;
32713314

32723315
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;
32733316

3274-
readToQueryBuf(c);
3317+
bool shouldRepeat = false;
3318+
int iter = 0;
3319+
do {
3320+
readToQueryBuf(c);
32753321

3276-
if (handleReadResult(c) == C_OK) {
3277-
if (processInputBuffer(c) == C_ERR) return;
3278-
}
3279-
beforeNextClient(c);
3322+
if (handleReadResult(c) == C_OK) {
3323+
if (processInputBuffer(c) == C_ERR) return;
3324+
}
3325+
iter++;
3326+
shouldRepeat = shouldRepeatRead(c, iter);
3327+
beforeNextClient(c);
3328+
} while (shouldRepeat);
32803329
}
32813330

32823331
/* An "Address String" is a colon separated ip:port pair.

src/server.c

+1
Original file line numberDiff line numberDiff line change
@@ -2806,6 +2806,7 @@ void initServer(void) {
28062806
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
28072807
server.reply_buffer_resizing_enabled = 1;
28082808
server.client_mem_usage_buckets = NULL;
2809+
server.repl_last_rate_update = 0;
28092810
resetReplicationBuffer();
28102811

28112812
/* Make sure the locale is set on startup based on the config file. */

src/server.h

+7
Original file line numberDiff line numberDiff line change
@@ -1181,6 +1181,7 @@ typedef struct client {
11811181
/* Input buffer and command parsing fields */
11821182
sds querybuf; /* Buffer we use to accumulate client queries. */
11831183
size_t qb_pos; /* The position we have read in querybuf. */
1184+
int qb_full_read; /* True if the last read returned the maximum allowed bytes */
11841185
robj **argv; /* Arguments of current command. */
11851186
int argc; /* Num of arguments of current command. */
11861187
int argv_len; /* Size of argv array (may be more than argc) */
@@ -2145,6 +2146,12 @@ struct valkeyServer {
21452146
/* Local environment */
21462147
char *locale_collate;
21472148
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */
2149+
2150+
/* Replication flow control */
2151+
int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */
2152+
int repl_cur_reads_per_io_event; /* Current allowed reads from the primary file descriptor per epoll I/O event */
2153+
int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */
2154+
mstime_t repl_last_rate_update; /* Timestamp of the last increase in replication reads per I/O event */
21482155
};
21492156

21502157
#define MAX_KEYS_BUFFER 256

valkey.conf

+19
Original file line numberDiff line numberDiff line change
@@ -2535,3 +2535,22 @@ jemalloc-bg-thread yes
25352535
# the empty string.
25362536
#
25372537
# availability-zone "zone-name"
2538+
2539+
################################## REPLICATION FLOW CONTROL ##################################
2540+
2541+
# Prioritizes replication traffic to reduce primary buffer overflows,
2542+
# reducing lag and the risk of full syncs. Allows the replica to
2543+
# consume data faster under high load.
2544+
#
2545+
# If enabled, the replica invokes multiple reads per I/O event when it
2546+
# detects replication pressure.
2547+
#
2548+
# Default: yes
2549+
# repl-flow-control-enabled yes
2550+
2551+
# Specifies the maximum number of replication reads allowed per I/O event.
2552+
# Higher values allow more replication data to be processed per event, reducing replication lag,
2553+
# but can throttle normal clients and increase their latency.
2554+
#
2555+
# Default: 25
2556+
# repl-max-reads-per-io-event 25

0 commit comments

Comments
 (0)