Skip to content

Commit fd0fd1a

Browse files
committed
implement qps control using token bucket algorithm
Signed-off-by: artikell <[email protected]>
1 parent 206f7f6 commit fd0fd1a

File tree

1 file changed

+58
-0
lines changed

1 file changed

+58
-0
lines changed

src/valkey-benchmark.c

+58
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ static struct config {
134134
pthread_mutex_t liveclients_mutex;
135135
pthread_mutex_t is_updating_slots_mutex;
136136
int resp3; /* use RESP3 */
137+
int qps;
138+
pthread_mutex_t token_bucket_mutex;
139+
long long last_time;
140+
long long time_per_token;
141+
long long time_per_burst;
137142
} config;
138143

139144
typedef struct _client {
@@ -220,6 +225,12 @@ static long long mstime(void) {
220225
return ustime() / 1000;
221226
}
222227

228+
static long long nstime(void) {
229+
struct timespec ts;
230+
clock_gettime(CLOCK_MONOTONIC, &ts);
231+
return (long long)ts.tv_sec * 1000000000LL + ts.tv_nsec;
232+
}
233+
223234
static uint64_t dictSdsHash(const void *key) {
224235
return dictGenHashFunction((unsigned char *)key, sdslen((char *)key));
225236
}
@@ -433,6 +444,32 @@ static void setClusterKeyHashTag(client c) {
433444
}
434445
}
435446

447+
static long long acquireTokenOrWait(int tokens) {
448+
if (config.num_threads) pthread_mutex_lock(&(config.token_bucket_mutex));
449+
450+
long long last_time_ = config.last_time;
451+
long long time_per_token_ = config.time_per_token;
452+
long long time_per_burst_ = config.time_per_burst;
453+
long long new_time = 0;
454+
455+
long long now_since_epoch = nstime();
456+
long long min_time = now_since_epoch - time_per_burst_;
457+
if (min_time > last_time_) {
458+
new_time = min_time + (time_per_token_ * tokens);
459+
} else {
460+
new_time = last_time_ + (time_per_token_ * tokens);
461+
}
462+
463+
long long delay_time = 0;
464+
if (new_time > now_since_epoch) {
465+
delay_time = new_time - now_since_epoch;
466+
} else {
467+
config.last_time = new_time;
468+
}
469+
if (config.num_threads) pthread_mutex_unlock(&(config.token_bucket_mutex));
470+
return delay_time;
471+
}
472+
436473
static void clientDone(client c) {
437474
int requests_finished = atomic_load_explicit(&config.requests_finished, memory_order_relaxed);
438475
if (requests_finished >= config.requests) {
@@ -568,6 +605,18 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
568605
UNUSED(fd);
569606
UNUSED(mask);
570607

608+
/* Acquire a token from the token bucket. */
609+
if (config.qps > 0) {
610+
do {
611+
long long delay = acquireTokenOrWait(config.pipeline);
612+
if (delay > 1000) {
613+
usleep(delay / 1000);
614+
} else {
615+
break;
616+
}
617+
} while (1);
618+
}
619+
571620
/* Initialize request when nothing was written. */
572621
if (c->written == 0) {
573622
/* Enforce upper bound to number of requests. */
@@ -974,6 +1023,11 @@ static void benchmark(const char *title, char *cmd, int len) {
9741023

9751024
if (config.num_threads) initBenchmarkThreads();
9761025

1026+
if (config.qps > 0) {
1027+
config.time_per_token = 1000000000 / config.qps;
1028+
config.time_per_burst = config.time_per_token * config.qps;
1029+
}
1030+
9771031
int thread_id = config.num_threads > 0 ? 0 : -1;
9781032
c = createClient(cmd, len, NULL, thread_id);
9791033
createMissingClients(c);
@@ -1339,6 +1393,9 @@ int parseOptions(int argc, char **argv) {
13391393
} else if (!strcmp(argv[i], "--user")) {
13401394
if (lastarg) goto invalid;
13411395
config.conn_info.user = sdsnew(argv[++i]);
1396+
} else if (!strcmp(argv[i], "--qps")) {
1397+
if (lastarg) goto invalid;
1398+
config.qps = atoi(argv[++i]);
13421399
} else if (!strcmp(argv[i], "-u") && !lastarg) {
13431400
parseRedisUri(argv[++i], "redis-benchmark", &config.conn_info, &config.tls);
13441401
if (config.conn_info.hostport < 0 || config.conn_info.hostport > 65535) {
@@ -1719,6 +1776,7 @@ int main(int argc, char **argv) {
17191776
config.num_threads = 0;
17201777
config.threads = NULL;
17211778
config.cluster_mode = 0;
1779+
config.qps = 0;
17221780
config.read_from_replica = FROM_PRIMARY_ONLY;
17231781
config.cluster_node_count = 0;
17241782
config.cluster_nodes = NULL;

0 commit comments

Comments
 (0)