Skip to content

Commit e931eb5

Browse files
committed
Support Blocking a Client on Keyspace Event Notification
This change enhances user experience and consistency by allowing a module to block a client on keyspace event notifications. Consistency is improved by ensuring that reads after writes on the same connection yield expected results. For example, in ValkeySearch, mutations processed earlier on the same connection will be available for search. The implementation extends `VM_BlockClient` to support blocking clients on keyspace event notifications. Internal clients, LUA clients, clients issueing multi exec and those with the `deny_blocking` flag set are not blocked. Once blocked, a client’s reply is withheld until it is explicitly unblocked. Signed-off-by: yairgott <[email protected]>
1 parent 3f6581b commit e931eb5

25 files changed

+551
-173
lines changed

src/bitops.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -669,7 +669,7 @@ void setbitCommand(client *c) {
669669
byteval |= ((on & 0x1) << bit);
670670
((uint8_t *)o->ptr)[byte] = byteval;
671671
signalModifiedKey(c, c->db, c->argv[1]);
672-
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
672+
notifyKeyspaceEvent(c, NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
673673
server.dirty++;
674674
}
675675

@@ -877,11 +877,11 @@ void bitopCommand(client *c) {
877877
if (maxlen) {
878878
o = createObject(OBJ_STRING, res);
879879
setKey(c, c->db, targetkey, &o, 0);
880-
notifyKeyspaceEvent(NOTIFY_STRING, "set", targetkey, c->db->id);
880+
notifyKeyspaceEvent(c, NOTIFY_STRING, "set", targetkey, c->db->id);
881881
server.dirty++;
882882
} else if (dbDelete(c->db, targetkey)) {
883883
signalModifiedKey(c, c->db, targetkey);
884-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", targetkey, c->db->id);
884+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", targetkey, c->db->id);
885885
server.dirty++;
886886
}
887887
addReplyLongLong(c, maxlen); /* Return the output string length in bytes. */
@@ -1361,7 +1361,7 @@ void bitfieldGeneric(client *c, int flags) {
13611361

13621362
if (changes) {
13631363
signalModifiedKey(c, c->db, c->argv[1]);
1364-
notifyKeyspaceEvent(NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
1364+
notifyKeyspaceEvent(c, NOTIFY_STRING, "setbit", c->argv[1], c->db->id);
13651365
server.dirty += changes;
13661366
}
13671367
zfree(ops);

src/cluster.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ void restoreCommand(client *c) {
275275
robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
276276
rewriteClientCommandVector(c, 2, aux, key);
277277
signalModifiedKey(c, c->db, key);
278-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
278+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", key, c->db->id);
279279
server.dirty++;
280280
}
281281
decrRefCount(obj);
@@ -297,7 +297,7 @@ void restoreCommand(client *c) {
297297
}
298298
objectSetLRUOrLFU(obj, lfu_freq, lru_idle, lru_clock, 1000);
299299
signalModifiedKey(c, c->db, key);
300-
notifyKeyspaceEvent(NOTIFY_GENERIC, "restore", key, c->db->id);
300+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "restore", key, c->db->id);
301301
addReply(c, shared.ok);
302302
server.dirty++;
303303
}
@@ -648,7 +648,7 @@ void migrateCommand(client *c) {
648648
/* No COPY option: remove the local key, signal the change. */
649649
dbDelete(c->db, kv[j]);
650650
signalModifiedKey(c, c->db, kv[j]);
651-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", kv[j], c->db->id);
651+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", kv[j], c->db->id);
652652
server.dirty++;
653653

654654
/* Populate the argument vector to replace the old one. */

src/cluster_legacy.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -6453,7 +6453,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
64536453
/* The keys are not actually logically deleted from the database, just moved to another node.
64546454
* The modules needs to know that these keys are no longer available locally, so just send the
64556455
* keyspace notification to the modules, but not to clients. */
6456-
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
6456+
moduleNotifyKeyspaceEvent(NULL, NOTIFY_GENERIC, "del", key, server.db[0].id);
64576457
exitExecutionUnit();
64586458
postExecutionUnitOperations();
64596459
decrRefCount(key);

src/db.c

+10-10
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
140140
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) server.stat_keyspace_hits++;
141141
/* TODO: Use separate hits stats for WRITE */
142142
} else {
143-
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
143+
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) notifyKeyspaceEvent(NULL, NOTIFY_KEY_MISS, "keymiss", key, db->id);
144144
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) server.stat_keyspace_misses++;
145145
/* TODO: Use separate misses stats and notify event for WRITE */
146146
}
@@ -229,7 +229,7 @@ static void dbAddInternal(serverDb *db, robj *key, robj **valref, int update_if_
229229
initObjectLRUOrLFU(val);
230230
kvstoreHashtableAdd(db->keys, dict_index, val);
231231
signalKeyAsReady(db, key, val->type);
232-
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
232+
notifyKeyspaceEvent(NULL, NOTIFY_NEW, "new", key, db->id);
233233
*valref = val;
234234
}
235235

@@ -827,7 +827,7 @@ void delGenericCommand(client *c, int lazy) {
827827
int deleted = lazy ? dbAsyncDelete(c->db, c->argv[j]) : dbSyncDelete(c->db, c->argv[j]);
828828
if (deleted) {
829829
signalModifiedKey(c, c->db, c->argv[j]);
830-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[j], c->db->id);
830+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", c->argv[j], c->db->id);
831831
server.dirty++;
832832
numdel++;
833833
}
@@ -1409,8 +1409,8 @@ void renameGenericCommand(client *c, int nx) {
14091409
if (expire != -1) o = setExpire(c, c->db, c->argv[2], expire);
14101410
signalModifiedKey(c, c->db, c->argv[1]);
14111411
signalModifiedKey(c, c->db, c->argv[2]);
1412-
notifyKeyspaceEvent(NOTIFY_GENERIC, "rename_from", c->argv[1], c->db->id);
1413-
notifyKeyspaceEvent(NOTIFY_GENERIC, "rename_to", c->argv[2], c->db->id);
1412+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "rename_from", c->argv[1], c->db->id);
1413+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "rename_to", c->argv[2], c->db->id);
14141414
server.dirty++;
14151415
addReply(c, nx ? shared.cone : shared.ok);
14161416
}
@@ -1477,8 +1477,8 @@ void moveCommand(client *c) {
14771477
/* OK! key moved */
14781478
signalModifiedKey(c, src, c->argv[1]);
14791479
signalModifiedKey(c, dst, c->argv[1]);
1480-
notifyKeyspaceEvent(NOTIFY_GENERIC, "move_from", c->argv[1], src->id);
1481-
notifyKeyspaceEvent(NOTIFY_GENERIC, "move_to", c->argv[1], dst->id);
1480+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "move_from", c->argv[1], src->id);
1481+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "move_to", c->argv[1], dst->id);
14821482

14831483
server.dirty++;
14841484
addReply(c, shared.cone);
@@ -1577,7 +1577,7 @@ void copyCommand(client *c) {
15771577

15781578
/* OK! key copied */
15791579
signalModifiedKey(c, dst, c->argv[2]);
1580-
notifyKeyspaceEvent(NOTIFY_GENERIC, "copy_to", c->argv[2], dst->id);
1580+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "copy_to", c->argv[2], dst->id);
15811581

15821582
server.dirty++;
15831583
addReply(c, shared.cone);
@@ -1828,7 +1828,7 @@ void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int d
18281828
dbGenericDeleteWithDictIndex(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED, dict_index);
18291829
latencyEndMonitor(expire_latency);
18301830
latencyAddSampleIfNeeded("expire-del", expire_latency);
1831-
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id);
1831+
notifyKeyspaceEvent(NULL, NOTIFY_EXPIRED, "expired", keyobj, db->id);
18321832
signalModifiedKey(NULL, db, keyobj);
18331833
propagateDeletion(db, keyobj, server.lazyfree_lazy_expire);
18341834
server.stat_expiredkeys++;
@@ -1850,7 +1850,7 @@ void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) {
18501850
robj *aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
18511851
rewriteClientCommandVector(c, 2, aux, keyobj);
18521852
signalModifiedKey(c, c->db, keyobj);
1853-
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, c->db->id);
1853+
notifyKeyspaceEvent(c, NOTIFY_EXPIRED, "expired", keyobj, c->db->id);
18541854
server.stat_expiredkeys++;
18551855
}
18561856

src/evict.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -675,7 +675,7 @@ int performEvictions(void) {
675675
mem_freed += delta;
676676
server.stat_evictedkeys++;
677677
signalModifiedKey(NULL, db, keyobj);
678-
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db->id);
678+
notifyKeyspaceEvent(NULL, NOTIFY_EVICTED, "evicted", keyobj, db->id);
679679
propagateDeletion(db, keyobj, server.lazyfree_lazy_eviction);
680680
exitExecutionUnit();
681681
postExecutionUnitOperations();

src/expire.c

+4-5
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,9 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
682682
return;
683683
} else {
684684
obj = setExpire(c, c->db, key, when);
685+
signalModifiedKey(c, c->db, key);
686+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "expire", key, c->db->id);
687+
server.dirty++;
685688
addReply(c, shared.cone);
686689
/* Propagate as PEXPIREAT millisecond-timestamp
687690
* Only rewrite the command arg if not already PEXPIREAT */
@@ -695,10 +698,6 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
695698
rewriteClientCommandArgument(c, 2, when_obj);
696699
decrRefCount(when_obj);
697700
}
698-
699-
signalModifiedKey(c, c->db, key);
700-
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
701-
server.dirty++;
702701
return;
703702
}
704703
}
@@ -772,7 +771,7 @@ void persistCommand(client *c) {
772771
if (lookupKeyWrite(c->db, c->argv[1])) {
773772
if (removeExpire(c->db, c->argv[1])) {
774773
signalModifiedKey(c, c->db, c->argv[1]);
775-
notifyKeyspaceEvent(NOTIFY_GENERIC, "persist", c->argv[1], c->db->id);
774+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "persist", c->argv[1], c->db->id);
776775
addReply(c, shared.cone);
777776
server.dirty++;
778777
} else {

src/geo.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
660660
/* store key is not NULL, try to delete it and return 0. */
661661
if (dbDelete(c->db, storekey)) {
662662
signalModifiedKey(c, c->db, storekey);
663-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
663+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", storekey, c->db->id);
664664
server.dirty++;
665665
}
666666
addReply(c, shared.czero);
@@ -781,12 +781,12 @@ void georadiusGeneric(client *c, int srcKeyIndex, int flags) {
781781
if (returned_items) {
782782
zsetConvertToListpackIfNeeded(zobj, maxelelen, totelelen);
783783
setKey(c, c->db, storekey, &zobj, 0);
784-
notifyKeyspaceEvent(NOTIFY_ZSET, flags & GEOSEARCH ? "geosearchstore" : "georadiusstore", storekey,
784+
notifyKeyspaceEvent(c, NOTIFY_ZSET, flags & GEOSEARCH ? "geosearchstore" : "georadiusstore", storekey,
785785
c->db->id);
786786
server.dirty += returned_items;
787787
} else if (dbDelete(c->db, storekey)) {
788788
signalModifiedKey(c, c->db, storekey);
789-
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", storekey, c->db->id);
789+
notifyKeyspaceEvent(c, NOTIFY_GENERIC, "del", storekey, c->db->id);
790790
server.dirty++;
791791
}
792792
addReplyLongLong(c, returned_items);

src/hyperloglog.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -1466,7 +1466,7 @@ void pfaddCommand(client *c) {
14661466
if (updated) {
14671467
HLL_INVALIDATE_CACHE(hdr);
14681468
signalModifiedKey(c, c->db, c->argv[1]);
1469-
notifyKeyspaceEvent(NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
1469+
notifyKeyspaceEvent(c, NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
14701470
server.dirty += updated;
14711471
}
14721472
addReply(c, updated ? shared.cone : shared.czero);
@@ -1642,7 +1642,7 @@ void pfmergeCommand(client *c) {
16421642
signalModifiedKey(c, c->db, c->argv[1]);
16431643
/* We generate a PFADD event for PFMERGE for semantical simplicity
16441644
* since in theory this is a mass-add of elements. */
1645-
notifyKeyspaceEvent(NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
1645+
notifyKeyspaceEvent(c, NOTIFY_STRING, "pfadd", c->argv[1], c->db->id);
16461646
server.dirty++;
16471647
addReply(c, shared.ok);
16481648
}

src/module.c

+58-11
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ typedef struct ValkeyModuleCtx ValkeyModuleCtx;
198198
#define VALKEYMODULE_CTX_NEW_CLIENT (1 << 7) /* Free client object when the \
199199
context is destroyed */
200200
#define VALKEYMODULE_CTX_CHANNELS_POS_REQUEST (1 << 8)
201-
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
202-
201+
#define VALKEYMODULE_CTX_COMMAND (1 << 9) /* Context created to serve a command from call() or AOF (which calls cmd->proc directly) */
202+
#define VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION (1 << 10) /* Context created a keyspace notification event */
203203

204204
/* This represents a key opened with VM_OpenKey(). */
205205
struct ValkeyModuleKey {
@@ -7807,8 +7807,24 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
78077807
void *privdata,
78087808
int flags) {
78097809
client *c = ctx->client;
7810+
if (c->flag.blocked || getClientType(c) != CLIENT_TYPE_NORMAL || c->flag.deny_blocking) {
7811+
/* Early return if duplicate block attempt or client is not normal or
7812+
* client is set to deny blocking. */
7813+
return NULL;
7814+
}
7815+
7816+
if (ctx->flags & (VALKEYMODULE_CTX_TEMP_CLIENT | VALKEYMODULE_CTX_NEW_CLIENT)) {
7817+
/* Temporary clients can't be blocked */
7818+
return NULL;
7819+
}
7820+
int is_keyspace_notification = ctx->flags & (VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION);
78107821
int islua = scriptIsRunning();
78117822
int ismulti = server.in_exec;
7823+
if ((islua || ismulti) && is_keyspace_notification) {
7824+
/* Avoid blocking within transactions when context initiated by
7825+
* keyspace notification. */
7826+
return NULL;
7827+
}
78127828
initClientBlockingState(c);
78137829

78147830
c->bstate->module_blocked_handle = zmalloc(sizeof(ValkeyModuleBlockedClient));
@@ -7864,6 +7880,11 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx,
78647880
c->bstate->timeout = timeout;
78657881
blockClient(c, BLOCKED_MODULE);
78667882
}
7883+
/* Defer response until after being unblocked for a context originated from
7884+
* keyspace notification events */
7885+
if (is_keyspace_notification) {
7886+
c->blocked_client_pending_reply = listCreate();
7887+
}
78677888
}
78687889
return bc;
78697890
}
@@ -8099,6 +8120,16 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
80998120
*
81008121
* 1. If the client is a Lua script.
81018122
* 2. If the client is executing a MULTI block.
8123+
* 3. If the client is a temporary module client.
8124+
* 4. If the client is already blocked.
8125+
*
8126+
* In cases 1 and 2, a call to ValkeyModule_BlockClient() will **not** block the
8127+
* client, but instead produce a specific error reply. Note that if the
8128+
* BlockClient call originated from within a keyspace notification, no error
8129+
* reply is generated and nullptr is returned.
8130+
*
8131+
* In case 3 and 4, a call to ValkeyModule_BlockClient() are no-op and
8132+
* return nullptr.
81028133
*
81038134
* In these cases, a call to ValkeyModule_BlockClient() will **not** block the
81048135
* client, but instead produce a specific error reply.
@@ -8300,6 +8331,10 @@ int moduleClientIsBlockedOnKeys(client *c) {
83008331
* ValkeyModule_BlockClientOnKeys() is accessible from the timeout
83018332
* callback via VM_GetBlockedClientPrivateData). */
83028333
int VM_UnblockClient(ValkeyModuleBlockedClient *bc, void *privdata) {
8334+
if (!bc) {
8335+
/* No-op if the blocked client is null. */
8336+
return VALKEYMODULE_OK;
8337+
}
83038338
if (bc->blocked_on_keys) {
83048339
/* In theory the user should always pass the timeout handler as an
83058340
* argument, but better to be safe than sorry. */
@@ -8392,11 +8427,17 @@ void moduleHandleBlockedClients(void) {
83928427
moduleInvokeFreePrivDataCallback(c, bc);
83938428
}
83948429

8395-
/* It is possible that this blocked client object accumulated
8396-
* replies to send to the client in a thread safe context.
8397-
* We need to glue such replies to the client output buffer and
8398-
* free the temporary client we just used for the replies. */
8399-
if (c) AddReplyFromClient(c, bc->reply_client);
8430+
if (c) {
8431+
/* Replies which were added after the client is blocked by a module
8432+
* are accumulated separately. We need to transmit those replies
8433+
* to the client. */
8434+
AddReplyModuleUnBlockedClient(c);
8435+
/* It is possible that this blocked client object accumulated
8436+
* replies to send to the client in a thread safe context.
8437+
* We need to glue such replies to the client output buffer and
8438+
* free the temporary client we just used for the replies. */
8439+
AddReplyFromClient(c, bc->reply_client);
8440+
}
84008441
moduleReleaseTempClient(bc->reply_client);
84018442
moduleReleaseTempClient(bc->thread_safe_ctx_client);
84028443

@@ -8836,14 +8877,14 @@ int VM_GetNotifyKeyspaceEvents(void) {
88368877
/* Expose notifyKeyspaceEvent to modules */
88378878
int VM_NotifyKeyspaceEvent(ValkeyModuleCtx *ctx, int type, const char *event, ValkeyModuleString *key) {
88388879
if (!ctx || !ctx->client) return VALKEYMODULE_ERR;
8839-
notifyKeyspaceEvent(type, (char *)event, key, ctx->client->db->id);
8880+
notifyKeyspaceEvent(NULL, type, (char *)event, key, ctx->client->db->id);
88408881
return VALKEYMODULE_OK;
88418882
}
88428883

88438884
/* Dispatcher for keyspace notifications to module subscriber functions.
88448885
* This gets called only if at least one module requested to be notified on
88458886
* keyspace notifications */
8846-
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
8887+
void moduleNotifyKeyspaceEvent(client *c, int type, const char *event, robj *key, int dbid) {
88478888
/* Don't do anything if there aren't any subscribers */
88488889
if (listLength(moduleKeyspaceSubscribers) == 0) return;
88498890

@@ -8880,8 +8921,14 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
88808921
if ((sub->event_mask & type) &&
88818922
(sub->active == 0 || (sub->module->options & VALKEYMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
88828923
ValkeyModuleCtx ctx;
8883-
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
8884-
selectDb(ctx.client, dbid);
8924+
if (c == NULL) {
8925+
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_TEMP_CLIENT);
8926+
selectDb(ctx.client, dbid);
8927+
} else {
8928+
moduleCreateContext(&ctx, sub->module, VALKEYMODULE_CTX_NONE);
8929+
ctx.client = c;
8930+
}
8931+
ctx.flags |= VALKEYMODULE_CTX_KEYSPACE_NOTIFICATION;
88858932

88868933
/* mark the handler as active to avoid reentrant loops.
88878934
* If the subscriber performs an action triggering itself,

src/module.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ size_t moduleCount(void);
202202
void moduleAcquireGIL(void);
203203
int moduleTryAcquireGIL(void);
204204
void moduleReleaseGIL(void);
205-
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
205+
void moduleNotifyKeyspaceEvent(client *c, int type, const char *event, robj *key, int dbid);
206206
void firePostExecutionUnitJobs(void);
207207
void moduleCallCommandFilters(client *c);
208208
void modulePostExecutionUnitOperations(void);

0 commit comments

Comments
 (0)