Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication Flow Control – Prioritizing replication traffic in the replica #1838

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -2496,6 +2496,14 @@ static int updateReplBacklogSize(const char **err) {
return 1;
}

static int updateReplFlowControl(const char **err) {
UNUSED(err);
if (server.repl_flow_control_enabled == 0) {
server.repl_cur_reads_per_io_event = 1;
}
return 1;
}

static int updateMaxmemory(const char **err) {
UNUSED(err);
if (server.maxmemory) {
Expand Down Expand Up @@ -3193,6 +3201,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("repl-flow-control-enabled", NULL, MODIFIABLE_CONFIG, server.repl_flow_control_enabled, 1, NULL, updateReplFlowControl),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down Expand Up @@ -3295,6 +3304,7 @@ standardConfig static_configs[] = {
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-max-reads-per-io-event", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_max_reads_per_io_event, 25, INTEGER_CONFIG, NULL, NULL),

/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
Expand Down
59 changes: 54 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <math.h>
#include <ctype.h>
#include <stdatomic.h>
#include <stdbool.h>

/* This struct is used to encapsulate filtering criteria for operations on clients
* such as identifying specific clients to kill or retrieve. Each field in the struct
Expand Down Expand Up @@ -3247,6 +3248,7 @@ void readToQueryBuf(client *c) {
if (c->nread <= 0) {
return;
}
c->qb_full_read = (size_t)c->nread == readlen ? 1 : 0;

sdsIncrLen(c->querybuf, c->nread);
qblen = sdslen(c->querybuf);
Expand All @@ -3264,19 +3266,66 @@ void readToQueryBuf(client *c) {
}
}

/**
* This function is designed to prioritize replication flow.
* Determines whether the replica should continue reading from the primary.
* It dynamically adjusts the read rate based on buffer utilization
* and ensures replication reads are not overly aggressive.
*
* @return 1 if another read should be attempted, 0 otherwise.
*/
int shouldRepeatRead(client *c, int iteration) {
// If the client is not a primary replica, is closing, or flow control is disabled, no more reads.
if (!(c->flag.primary) || c->flag.close_asap || !server.repl_flow_control_enabled) {
return 0;
}

bool is_last_iteration = iteration >= server.repl_cur_reads_per_io_event;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not find where this variable repl_cur_reads_per_io_event initialization, did you do in somewhere else? If not, it is dangerous, please initialize it.


if (is_last_iteration) {
Copy link
Member

@madolson madolson Mar 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I don't really understand why this needs to be adaptive. As long as we got a full read, why can't we repeat up until the max io event size? There is a comment about "The read limit ramps up gradually if full reads continue, avoiding sudden spikes.", but we are just deferring the spikes until later. Most users don't like replication lag, they would rather have the most up to date data if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I don't really understand why this needs to be adaptive. As long as we got a full read, why can't we repeat up until the max io event size? There is a comment about "The read limit ramps up gradually if full reads continue, avoiding sudden spikes.", but we are just deferring the spikes until later. Most users don't like replication lag, they would rather have the most up to date data if possible.

I have no strong opinion here, probably reading up to the max reads is enough (and simpler).

/* If the last read filled the buffer AND enough time has passed since the last increase:
* - Increase the read rate, up to a max limit.
* - This ensures a gradual ramp-up instead of an overly aggressive approach. */
if (c->qb_full_read && server.mstime - server.repl_last_rate_update > 100) {
server.repl_cur_reads_per_io_event = MIN(server.repl_max_reads_per_io_event,
server.repl_cur_reads_per_io_event + 1);
server.repl_last_rate_update = server.mstime; // Update the last increase timestamp.
}
} else {
/* If the last read completely filled the buffer, continue reading. */
if (c->qb_full_read) {
return 1;
}

/* If the buffer was NOT fully filled, it indicates less replication pressure.
* Reduce the read rate to avoid excessive polling and free up resources for other clients. */
server.repl_cur_reads_per_io_event = MAX(1, server.repl_cur_reads_per_io_event - 1);
}

/* Stop reading for now (if we reached this point, conditions to continue were not met). */
return 0;
}


void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
/* Check if we can send the client to be handled by the IO-thread */
if (postponeClientRead(c)) return;

if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) return;

readToQueryBuf(c);
bool shouldRepeat = false;
int iter = 0;
do {
readToQueryBuf(c);

if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
beforeNextClient(c);
if (handleReadResult(c) == C_OK) {
if (processInputBuffer(c) == C_ERR) return;
}
iter++;
shouldRepeat = shouldRepeatRead(c, iter);
beforeNextClient(c);
} while (shouldRepeat);
}

/* An "Address String" is a colon separated ip:port pair.
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2806,6 +2806,7 @@ void initServer(void) {
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
server.repl_last_rate_update = 0;
resetReplicationBuffer();

/* Make sure the locale is set on startup based on the config file. */
Expand Down
7 changes: 7 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,7 @@ typedef struct client {
/* Input buffer and command parsing fields */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
int qb_full_read; /* True if the last read returned the maximum allowed bytes */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like a boolean variable (I check the codes, the variable is 1 or 0), how about change to is_qb_full_read or similar? It is easy to read for others.

robj **argv; /* Arguments of current command. */
int argc; /* Num of arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
Expand Down Expand Up @@ -2145,6 +2146,12 @@ struct valkeyServer {
/* Local environment */
char *locale_collate;
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */

/* Replication flow control */
int repl_flow_control_enabled; /* Enables adaptive flow control for replication reads on the replica */
int repl_cur_reads_per_io_event; /* Current allowed reads from the primary file descriptor per epoll I/O event */
int repl_max_reads_per_io_event; /* Maximum allowed reads from the primary file descriptor per I/O event */
mstime_t repl_last_rate_update; /* Timestamp of the last increase in replication reads per I/O event */
};

#define MAX_KEYS_BUFFER 256
Expand Down
19 changes: 19 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2535,3 +2535,22 @@ jemalloc-bg-thread yes
# the empty string.
#
# availability-zone "zone-name"

################################## REPLICATION FLOW CONTROL ##################################

# Prioritizes replication traffic to reduce primary buffer overflows,
# reducing lag and the risk of full syncs. Allows the replica to
# consume data faster under high load.
#
# If enabled, the replica invokes multiple reads per I/O event when it
# detects replication pressure.
#
# Default: yes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reservations about the default value is yes due to the lower performance.

# repl-flow-control-enabled yes

# Specifies the maximum number of replication reads allowed per I/O event.
# Higher values allow more replication data to be processed per event, reducing replication lag,
# but can throttle normal clients and increase their latency.
#
# Default: 25
# repl-max-reads-per-io-event 25
Loading