Skip to content

Commit 206f7f6

Browse files
Pass null-terminated node ID for VM_RegisterClusterMessageReceiver and add test coverage (#1708)
* Pass `sender_id` as `NULL` terminated string as part of `ValkeyModuleClusterMessageReceiver` for ease of usage by the module(s). * Implement test coverage described in #1656 and ensures that nodes in the cluster module properly acknowledge a "DING" message by sending a "DONG" response. Closes #1656. --------- Signed-off-by: Nikhil Manglore <[email protected]>
1 parent cf19deb commit 206f7f6

File tree

4 files changed

+42
-2
lines changed

4 files changed

+42
-2
lines changed

src/cluster_legacy.c

+4-1
Original file line numberDiff line numberDiff line change
@@ -3045,7 +3045,10 @@ static void clusterProcessModulePacket(clusterMsgModule *module_data, clusterNod
30453045
uint32_t len = ntohl(module_data->len);
30463046
uint8_t type = module_data->type;
30473047
unsigned char *payload = module_data->bulk_data;
3048-
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
3048+
3049+
sds sender_name = sdsnewlen(sender->name, CLUSTER_NAMELEN);
3050+
moduleCallClusterReceivers(sender_name, module_id, type, payload, len);
3051+
sdsfree(sender_name);
30493052
}
30503053

30513054
static void clusterProcessLightPacket(clusterNode *sender, clusterLink *link, uint16_t type) {

src/module.c

+7-1
Original file line numberDiff line numberDiff line change
@@ -8950,7 +8950,13 @@ void moduleCallClusterReceivers(const char *sender_id,
89508950
* was already a registered callback, this will replace the callback function
89518951
* with the one provided, otherwise if the callback is set to NULL and there
89528952
* is already a callback for this function, the callback is unregistered
8953-
* (so this API call is also used in order to delete the receiver). */
8953+
* (so this API call is also used in order to delete the receiver).
8954+
*
8955+
* When a message of this type is received, the registered callback function
8956+
* will be invoked with details, including the 40-byte node ID of the sender.
8957+
*
8958+
* In Valkey 8.1 and later, the node ID is null-terminated. Prior to 8.1, it was
8959+
* not null-terminated */
89548960
void VM_RegisterClusterMessageReceiver(ValkeyModuleCtx *ctx,
89558961
uint8_t type,
89568962
ValkeyModuleClusterMessageReceiver callback) {

tests/modules/cluster.c

+12
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ int PingallCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
4646
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
4747
}
4848

49+
void DingReceiver(ValkeyModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len) {
50+
ValkeyModule_Log(ctx, "notice", "DING (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN, sender_id, (int)len, payload);
51+
ValkeyModule_SendClusterMessage(ctx, sender_id, MSGTYPE_DONG, "Message Received!", 17);
52+
}
53+
54+
void DongReceiver(ValkeyModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len) {
55+
ValkeyModule_Log(ctx, "notice", "DONG (type %d) RECEIVED from %.*s: '%.*s'", type, VALKEYMODULE_NODE_ID_LEN, sender_id, (int)len, payload);
56+
}
57+
4958
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
5059
VALKEYMODULE_NOT_USED(argv);
5160
VALKEYMODULE_NOT_USED(argc);
@@ -63,5 +72,8 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
6372
if (ValkeyModule_CreateCommand(ctx, "test.cluster_shards", test_cluster_shards, "", 0, 0, 0) == VALKEYMODULE_ERR)
6473
return VALKEYMODULE_ERR;
6574

75+
/* Register our handlers for different message types. */
76+
ValkeyModule_RegisterClusterMessageReceiver(ctx, MSGTYPE_DING, DingReceiver);
77+
ValkeyModule_RegisterClusterMessageReceiver(ctx, MSGTYPE_DONG, DongReceiver);
6678
return VALKEYMODULE_OK;
6779
}

tests/unit/moduleapi/cluster.tcl

+19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,25 @@ start_cluster 3 0 [list config_lines $modules] {
2222
fail "node 2 or node 3 didn't receive cluster module message"
2323
}
2424
}
25+
26+
$node1 CONFIG RESETSTAT
27+
$node2 CONFIG RESETSTAT
28+
$node3 CONFIG RESETSTAT
29+
30+
test "Cluster module message DING/DONG acknowledgment" {
31+
assert_equal OK [$node1 test.pingall]
32+
wait_for_condition 50 100 {
33+
[CI 0 cluster_stats_messages_module_received] eq 2 &&
34+
[CI 1 cluster_stats_messages_module_received] eq 1 &&
35+
[CI 2 cluster_stats_messages_module_received] eq 1
36+
} else {
37+
fail "node 2 or node 3 didn't receive DING messages or node 1 didn't receive DONG message"
38+
}
39+
40+
verify_log_message -1 "*DING (type 1) RECEIVED*Hey*" 0
41+
verify_log_message -2 "*DING (type 1) RECEIVED*Hey*" 0
42+
assert_equal 2 [count_log_message 0 "* <cluster> DONG (type 2) RECEIVED*"]
43+
}
2544
}
2645

2746
set testmodule_nokey [file normalize tests/modules/blockonbackground.so]

0 commit comments

Comments
 (0)