Skip to content

Commit fde2e1f

Browse files
yairgottmadolson
andcommitted
Update src/networking.c
Co-authored-by: Madelyn Olson <[email protected]> Signed-off-by: Yair Gottdenker <[email protected]> Signed-off-by: yairgott <[email protected]>
1 parent 0dccca7 commit fde2e1f

File tree

3 files changed

+57
-45
lines changed

3 files changed

+57
-45
lines changed

src/module.c

+6-3
Original file line numberDiff line numberDiff line change
@@ -8112,9 +8112,12 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
81128112
* free_privdata: called in order to free the private data that is passed
81138113
* by ValkeyModule_UnblockClient() call.
81148114
*
8115-
* Note: ValkeyModule_UnblockClient should be called for every blocked client,
8116-
* even if client was killed, timed-out or disconnected. Failing to do so
8117-
* will result in memory leaks.
8115+
* Notes:
8116+
* 1. ValkeyModule_UnblockClient should be called for every blocked client,
8117+
* even if client was killed, timed-out or disconnected. Failing to do so
8118+
* will result in memory leaks.
8119+
* 2. Blocking the client on keyspace event notification is supported from
8120+
* version 8.1
81188121
*
81198122
* There are some cases where ValkeyModule_BlockClient() cannot be used:
81208123
*

src/networking.c

+3-4
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ void putClientInPendingWriteQueue(client *c) {
305305
int prepareClientToWrite(client *c) {
306306
/* If it's the Lua client we always return ok without installing any
307307
* handler since there is no socket at all. */
308-
if (c->flag.script || c->flag.module || c->blocked_client_pending_reply != NULL) return C_OK;
308+
if (c->flag.script || c->flag.module) return C_OK;
309309

310310
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
311311
if (c->flag.close_asap) return C_ERR;
@@ -326,9 +326,7 @@ int prepareClientToWrite(client *c) {
326326

327327
/* Schedule the client to write the output buffers to the socket, unless
328328
* it should already be setup to do so (it has already pending data). */
329-
if (!clientHasPendingReplies(c)) {
330-
putClientInPendingWriteQueue(c);
331-
}
329+
if (!clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
332330

333331
/* Authorize the caller to queue in the output buffer of this client. */
334332
return C_OK;
@@ -435,6 +433,7 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len
435433
* least PROTO_REPLY_CHUNK_BYTES */
436434
size_t usable_size;
437435
size_t size = len < PROTO_REPLY_CHUNK_BYTES ? PROTO_REPLY_CHUNK_BYTES : len;
436+
if (c->blocked_client_pending_reply) size = len;
438437
tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
439438
/* take over the allocation's internal fragmentation */
440439
tail->size = usable_size - sizeof(clientReplyBlock);

tests/modules/block_keyspace_notification.c

+48-38
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
#define _DEFAULT_SOURCE /* For usleep */
44

55
#include "valkeymodule.h"
6+
#include <pthread.h>
67
#include <stdio.h>
78
#include <string.h>
89
#include <strings.h>
9-
#include <pthread.h>
1010
#include <unistd.h>
1111

1212
#define EVENT_LOG_MAX_SIZE 1024
@@ -31,7 +31,7 @@ typedef struct BackgroundThreadData {
3131
} BackgroundThreadData;
3232

3333
static void *GenericEvent_BackgroundWork(void *arg) {
34-
BackgroundThreadData * data = (BackgroundThreadData *)arg;
34+
BackgroundThreadData *data = (BackgroundThreadData *)arg;
3535
// Sleep for 1 second
3636
sleep(1);
3737
pthread_mutex_lock(&event_log_mutex);
@@ -47,26 +47,34 @@ static void *GenericEvent_BackgroundWork(void *arg) {
4747
pthread_exit(NULL);
4848
}
4949

50-
static int KeySpace_NotificationGeneric(ValkeyModuleCtx *ctx, int type, const char *event, ValkeyModuleString *key){
50+
static int KeySpace_NotificationGeneric(ValkeyModuleCtx *ctx, int type,
51+
const char *event,
52+
ValkeyModuleString *key) {
5153
VALKEYMODULE_NOT_USED(ctx);
5254
VALKEYMODULE_NOT_USED(type);
5355
ValkeyModuleString *retained_key = ValkeyModule_HoldString(ctx, key);
54-
ValkeyModuleBlockedClient * bc = ValkeyModule_BlockClient(ctx, NULL, NULL, NULL, 0);
56+
ValkeyModuleBlockedClient *bc =
57+
ValkeyModule_BlockClient(ctx, NULL, NULL, NULL, 0);
5558
if (bc == NULL) {
56-
ValkeyModule_Log(ctx, VALKEYMODULE_LOGLEVEL_NOTICE, "Failed to block for event %s on %s!", event, ValkeyModule_StringPtrLen(key, NULL));
59+
ValkeyModule_Log(ctx, VALKEYMODULE_LOGLEVEL_NOTICE,
60+
"Failed to block for event %s on %s!", event,
61+
ValkeyModule_StringPtrLen(key, NULL));
5762
}
58-
BackgroundThreadData * data = ValkeyModule_Alloc(sizeof(BackgroundThreadData));
63+
BackgroundThreadData *data =
64+
ValkeyModule_Alloc(sizeof(BackgroundThreadData));
5965
data->bc = bc;
60-
KeyspaceEventData * event_data = ValkeyModule_Alloc(sizeof(KeyspaceEventData));
66+
KeyspaceEventData *event_data =
67+
ValkeyModule_Alloc(sizeof(KeyspaceEventData));
6168
event_data->key = retained_key;
6269
event_data->event = ValkeyModule_CreateString(ctx, event, strlen(event));
6370
data->event = event_data;
6471
pthread_t tid;
65-
pthread_create(&tid, NULL, GenericEvent_BackgroundWork, (void*)data);
72+
pthread_create(&tid, NULL, GenericEvent_BackgroundWork, (void *)data);
6673
return VALKEYMODULE_OK;
6774
}
6875

69-
static int cmdGetEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
76+
static int cmdGetEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
77+
int argc) {
7078
VALKEYMODULE_NOT_USED(argv);
7179
VALKEYMODULE_NOT_USED(argc);
7280
pthread_mutex_lock(&event_log_mutex);
@@ -82,7 +90,8 @@ static int cmdGetEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
8290
return VALKEYMODULE_OK;
8391
}
8492

85-
static int cmdClearEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
93+
static int cmdClearEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
94+
int argc) {
8695
VALKEYMODULE_NOT_USED(argv);
8796
VALKEYMODULE_NOT_USED(argc);
8897
pthread_mutex_lock(&event_log_mutex);
@@ -100,10 +109,12 @@ static int cmdClearEvents(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int a
100109

101110
/* This function must be present on each Valkey module. It is used in order to
102111
* register the commands into the Valkey server. */
103-
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
112+
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
113+
int argc) {
104114
VALKEYMODULE_NOT_USED(argv);
105115
VALKEYMODULE_NOT_USED(argc);
106-
if (ValkeyModule_Init(ctx,"testblockingkeyspacenotif",1,VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR){
116+
if (ValkeyModule_Init(ctx, "testblockingkeyspacenotif", 1,
117+
VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) {
107118
return VALKEYMODULE_ERR;
108119
}
109120
event_log = ValkeyModule_Alloc(sizeof(KeyspaceEventLog));
@@ -113,31 +124,31 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
113124
// VALKEYMODULE_NOTIFY_LOADED event are not supported we can not start
114125
return VALKEYMODULE_ERR;
115126
}
116-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_LOADED, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
117-
return VALKEYMODULE_ERR;
118-
}
119-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_GENERIC, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
120-
return VALKEYMODULE_ERR;
121-
}
122-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_EXPIRED, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
123-
return VALKEYMODULE_ERR;
124-
}
125-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_MODULE, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
126-
return VALKEYMODULE_ERR;
127-
}
128-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
129-
return VALKEYMODULE_ERR;
130-
}
131-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_STRING, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
132-
return VALKEYMODULE_ERR;
133-
}
134-
if(ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_HASH, KeySpace_NotificationGeneric) != VALKEYMODULE_OK){
135-
return VALKEYMODULE_ERR;
136-
}
137-
if (ValkeyModule_CreateCommand(ctx,"b_keyspace.events", cmdGetEvents,"",0,0,0) == VALKEYMODULE_ERR){
138-
return VALKEYMODULE_ERR;
139-
}
140-
if (ValkeyModule_CreateCommand(ctx,"b_keyspace.clear", cmdClearEvents,"",0,0,0) == VALKEYMODULE_ERR){
127+
if (ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_LOADED,
128+
KeySpace_NotificationGeneric) !=
129+
VALKEYMODULE_OK ||
130+
ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_GENERIC,
131+
KeySpace_NotificationGeneric) !=
132+
VALKEYMODULE_OK ||
133+
ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_EXPIRED,
134+
KeySpace_NotificationGeneric) !=
135+
VALKEYMODULE_OK ||
136+
ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_MODULE,
137+
KeySpace_NotificationGeneric) !=
138+
VALKEYMODULE_OK ||
139+
ValkeyModule_SubscribeToKeyspaceEvents(
140+
ctx, VALKEYMODULE_NOTIFY_KEY_MISS, KeySpace_NotificationGeneric) !=
141+
VALKEYMODULE_OK ||
142+
ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_STRING,
143+
KeySpace_NotificationGeneric) !=
144+
VALKEYMODULE_OK ||
145+
ValkeyModule_SubscribeToKeyspaceEvents(ctx, VALKEYMODULE_NOTIFY_HASH,
146+
KeySpace_NotificationGeneric) !=
147+
VALKEYMODULE_OK ||
148+
ValkeyModule_CreateCommand(ctx, "b_keyspace.events", cmdGetEvents, "",
149+
0, 0, 0) == VALKEYMODULE_ERR ||
150+
ValkeyModule_CreateCommand(ctx, "b_keyspace.clear", cmdClearEvents, "",
151+
0, 0, 0) == VALKEYMODULE_ERR) {
141152
return VALKEYMODULE_ERR;
142153
}
143154
return VALKEYMODULE_OK;
@@ -156,4 +167,3 @@ int ValkeyModule_OnUnload(ValkeyModuleCtx *ctx) {
156167
pthread_mutex_unlock(&event_log_mutex);
157168
return VALKEYMODULE_OK;
158169
}
159-

0 commit comments

Comments
 (0)