Skip to content

Commit f9aad1a

Browse files
committed
Adding support for sharing memory between the module and the engine
Sharing memory between the module and engine reduces memory overhead by eliminating redundant copies of stored entries in the module. This is particularly beneficial for search workloads that require indexing large volumes of stored data. Shared SDS, a new data type, facilitates module-engine memory sharing with thread-safe intrusive reference counting. It preserves SDS semantics and structure while adding ref-counting and a free callback for statistics tracking. New module APIs: - VM_CreateSharedSDS: Creates a new Shared SDS. - VM_SharedSDSPtrLen: Retrieves the raw buffer pointer and length of a Shared SDS. - VM_ReleaseSharedSDS: Decreases the Shared SDS ref-count by 1. Extended module APIs: - VM_HashSet: Now supports setting a shared SDS in the hash. - VM_HashGet: Retrieves a shared SDS and increments its ref-count by 1.
1 parent 3f6581b commit f9aad1a

File tree

6 files changed

+221
-24
lines changed

6 files changed

+221
-24
lines changed

src/module.c

+104-16
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ struct AutoMemEntry {
122122
#define VALKEYMODULE_AM_FREED 3 /* Explicitly freed by user already. */
123123
#define VALKEYMODULE_AM_DICT 4
124124
#define VALKEYMODULE_AM_INFO 5
125+
#define VALKEYMODULE_AM_SHARED_SDS 6
125126

126127
/* The pool allocator block. Modules can allocate memory via this special
127128
* allocator that will automatically release it all once the callback returns.
@@ -472,6 +473,7 @@ typedef int (*ValkeyModuleConfigSetBoolFunc)(const char *name, int val, void *pr
472473
typedef int (*ValkeyModuleConfigSetEnumFunc)(const char *name, int val, void *privdata, ValkeyModuleString **err);
473474
/* Apply signature, identical to valkeymodule.h */
474475
typedef int (*ValkeyModuleConfigApplyFunc)(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err);
476+
typedef void *(*ValkeyModuleSharedSDSAllocFunc)(size_t len, size_t *alloc);
475477

476478
/* Struct representing a module config. These are stored in a list in the module struct */
477479
struct ModuleConfig {
@@ -515,6 +517,7 @@ static void zsetKeyReset(ValkeyModuleKey *key);
515517
static void moduleInitKeyTypeSpecific(ValkeyModuleKey *key);
516518
void VM_FreeDict(ValkeyModuleCtx *ctx, ValkeyModuleDict *d);
517519
void VM_FreeServerInfo(ValkeyModuleCtx *ctx, ValkeyModuleServerInfoData *data);
520+
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds);
518521

519522
/* Helpers for VM_SetCommandInfo. */
520523
static int moduleValidateCommandInfo(const ValkeyModuleCommandInfo *info);
@@ -2682,6 +2685,7 @@ void autoMemoryCollect(ValkeyModuleCtx *ctx) {
26822685
case VALKEYMODULE_AM_KEY: VM_CloseKey(ptr); break;
26832686
case VALKEYMODULE_AM_DICT: VM_FreeDict(NULL, ptr); break;
26842687
case VALKEYMODULE_AM_INFO: VM_FreeServerInfo(NULL, ptr); break;
2688+
case VALKEYMODULE_AM_SHARED_SDS: VM_ReleaseSharedSDS(ptr); break;
26852689
}
26862690
}
26872691
ctx->flags |= VALKEYMODULE_CTX_AUTO_MEMORY;
@@ -5259,6 +5263,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
52595263
* are created.
52605264
* VALKEYMODULE_HASH_CFIELDS: The field names passed are null terminated C
52615265
* strings instead of ValkeyModuleString objects.
5266+
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
52625267
* VALKEYMODULE_HASH_COUNT_ALL: Include the number of inserted fields in the
52635268
* returned number, in addition to the number of
52645269
* updated and deleted fields. (Added in Redis OSS
@@ -5298,7 +5303,7 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) {
52985303
int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
52995304
va_list ap;
53005305
if (!key || (flags & ~(VALKEYMODULE_HASH_NX | VALKEYMODULE_HASH_XX | VALKEYMODULE_HASH_CFIELDS |
5301-
VALKEYMODULE_HASH_COUNT_ALL))) {
5306+
VALKEYMODULE_HASH_COUNT_ALL | VALKEYMODULE_HASH_SHAREBLE_VALUES))) {
53025307
errno = EINVAL;
53035308
return 0;
53045309
} else if (key->value && key->value->type != OBJ_HASH) {
@@ -5313,7 +5318,7 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
53135318
int count = 0;
53145319
va_start(ap, flags);
53155320
while (1) {
5316-
ValkeyModuleString *field, *value;
5321+
ValkeyModuleString *field, *value = NULL;
53175322
/* Get the field and value objects. */
53185323
if (flags & VALKEYMODULE_HASH_CFIELDS) {
53195324
char *cfield = va_arg(ap, char *);
@@ -5347,9 +5352,21 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
53475352
* to avoid a useless copy. */
53485353
if (flags & VALKEYMODULE_HASH_CFIELDS) low_flags |= HASH_SET_TAKE_FIELD;
53495354

5350-
robj *argv[2] = {field, value};
5351-
hashTypeTryConversion(key->value, argv, 0, 1);
5352-
int updated = hashTypeSet(key->value, field->ptr, value->ptr, low_flags);
5355+
char *value_sds;
5356+
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
5357+
if (key->value->encoding == OBJ_ENCODING_LISTPACK) {
5358+
/* Convert to hashtable encoding, as list pack encoding performs a deep copy
5359+
* of the buffer, breaking ref-counting semantics. */
5360+
hashTypeConvert(key->value, OBJ_ENCODING_HASHTABLE);
5361+
}
5362+
value_sds = ((ValkeyModuleSharedSDS *)value)->buf;
5363+
} else {
5364+
value_sds = value->ptr;
5365+
robj *argv[2] = {field, value};
5366+
hashTypeTryConversion(key->value, argv, 0, 1);
5367+
}
5368+
5369+
int updated = hashTypeSet(key->value, field->ptr, value_sds, low_flags);
53535370
count += (flags & VALKEYMODULE_HASH_COUNT_ALL) ? 1 : updated;
53545371

53555372
/* If CFIELDS is active, SDS string ownership is now of hashTypeSet(),
@@ -5383,6 +5400,8 @@ int VM_HashSet(ValkeyModuleKey *key, int flags, ...) {
53835400
*
53845401
* VALKEYMODULE_HASH_CFIELDS: field names as null terminated C strings.
53855402
*
5403+
* VALKEYMODULE_HASH_SHAREBLE_VALUES: The passed values are ValkeyModuleSharedSDS objects.
5404+
*
53865405
* VALKEYMODULE_HASH_EXISTS: instead of setting the value of the field
53875406
* expecting a ValkeyModuleString pointer to pointer, the function just
53885407
* reports if the field exists or not and expects an integer pointer
@@ -5412,7 +5431,7 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {
54125431

54135432
va_start(ap, flags);
54145433
while (1) {
5415-
ValkeyModuleString *field, **valueptr;
5434+
ValkeyModuleString *field;
54165435
int *existsptr;
54175436
/* Get the field object and the value pointer to pointer. */
54185437
if (flags & VALKEYMODULE_HASH_CFIELDS) {
@@ -5432,17 +5451,32 @@ int VM_HashGet(ValkeyModuleKey *key, int flags, ...) {
54325451
else
54335452
*existsptr = 0;
54345453
} else {
5435-
valueptr = va_arg(ap, ValkeyModuleString **);
5436-
if (key->value) {
5437-
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
5438-
if (*valueptr) {
5439-
robj *decoded = getDecodedObject(*valueptr);
5440-
decrRefCount(*valueptr);
5441-
*valueptr = decoded;
5442-
}
5443-
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
5444-
} else {
5454+
if (!key->value) {
5455+
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
54455456
*valueptr = NULL;
5457+
} else {
5458+
if (flags & VALKEYMODULE_HASH_SHAREBLE_VALUES) {
5459+
ValkeyModuleSharedSDS **valueptr = va_arg(ap, ValkeyModuleSharedSDS **);
5460+
*valueptr = NULL;
5461+
/* shared SDS is supported only with hashtable encoding */
5462+
if (key->value->encoding == OBJ_ENCODING_HASHTABLE) {
5463+
sds value_sds = hashTypeGetFromHashTable(key->value, field->ptr);
5464+
if (value_sds && sdsType(value_sds) == SDS_TYPE_32_SHARED) {
5465+
*valueptr = (ValkeyModuleSharedSDS *)(value_sds - sdsHdrSize(sdsType(value_sds)));
5466+
sdsRetain(*valueptr);
5467+
autoMemoryAdd(key->ctx, VALKEYMODULE_AM_SHARED_SDS, *valueptr);
5468+
}
5469+
}
5470+
} else {
5471+
ValkeyModuleString **valueptr = va_arg(ap, ValkeyModuleString **);
5472+
*valueptr = hashTypeGetValueObject(key->value, field->ptr);
5473+
if (*valueptr) {
5474+
robj *decoded = getDecodedObject(*valueptr);
5475+
decrRefCount(*valueptr);
5476+
*valueptr = decoded;
5477+
}
5478+
if (*valueptr) autoMemoryAdd(key->ctx, VALKEYMODULE_AM_STRING, *valueptr);
5479+
}
54465480
}
54475481
}
54485482

@@ -13256,6 +13290,57 @@ ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
1325613290
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
1325713291
}
1325813292

13293+
/* --------------------------------------------------------------------------
13294+
* ## Shared SDS APIs
13295+
* -------------------------------------------------------------------------- */
13296+
13297+
/* Create a new module shared SDS object. The newly created SDS object's intrusive
13298+
* reference count is initialized to 1.
13299+
* The caller is responsible for invoking `ValkeyModule_ReleaseSharedSDS` when the
13300+
* object is no longer needed to ensure proper cleanup.
13301+
*
13302+
* Parameters:
13303+
* - `len`: Specifies the size of the allocated SDS buffer.
13304+
* - `allocfn`: A custom memory allocation function, allowing fine-grained control
13305+
* over the allocation strategy.
13306+
* - `freecbfn`: A callback function triggered on deallocation. Note that this does
13307+
* not free the object itself but is primarily used for statistical tracking.
13308+
*
13309+
* Returns:
13310+
* - A pointer to the created shared SDS object.
13311+
*/
13312+
ValkeyModuleSharedSDS *VM_CreateSharedSDS(size_t len, ValkeyModuleSharedSDSAllocFunc allocfn, ValkeyModuleSharedSDSFreeCBFunc freecbfn) {
13313+
size_t alloc;
13314+
void *buf = allocfn(len + sizeof(ValkeyModuleSharedSDS) + 1, &alloc);
13315+
return sdsInitShared((char *)buf, len, alloc, freecbfn);
13316+
}
13317+
13318+
/* Retrieves the pointer to the shared SDS buffer along with its length.
13319+
*
13320+
* Parameters:
13321+
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object.
13322+
* - `len`: Output parameter that stores the length of the SDS buffer.
13323+
*
13324+
* Returns:
13325+
* - A pointer to the SDS buffer string.
13326+
*/
13327+
char *VM_SharedSDSPtrLen(ValkeyModuleSharedSDS *shared_sds, size_t *len) {
13328+
*len = shared_sds->len;
13329+
return (char *)shared_sds + sizeof(ValkeyModuleSharedSDS);
13330+
}
13331+
13332+
/* Releases a shared SDS object by decrementing its intrusive reference count.
13333+
*
13334+
* Every shared SDS object created by `VM_CreateSharedSDS` must be released
13335+
* using `VM_ReleaseSharedSDS` to ensure proper memory management.
13336+
*
13337+
* Parameters:
13338+
* - `shared_sds`: A pointer to the `ValkeyModuleSharedSDS` object to be released.
13339+
*/
13340+
void VM_ReleaseSharedSDS(ValkeyModuleSharedSDS *shared_sds) {
13341+
sdsfree(shared_sds->buf);
13342+
}
13343+
1325913344
/* MODULE command.
1326013345
*
1326113346
* MODULE LIST
@@ -14130,4 +14215,7 @@ void moduleRegisterCoreAPI(void) {
1413014215
REGISTER_API(RegisterScriptingEngine);
1413114216
REGISTER_API(UnregisterScriptingEngine);
1413214217
REGISTER_API(GetFunctionExecutionState);
14218+
REGISTER_API(CreateSharedSDS);
14219+
REGISTER_API(SharedSDSPtrLen);
14220+
REGISTER_API(ReleaseSharedSDS);
1413314221
}

src/sds.c

+62-2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ int sdsHdrSize(char type) {
4848
case SDS_TYPE_16: return sizeof(struct sdshdr16);
4949
case SDS_TYPE_32: return sizeof(struct sdshdr32);
5050
case SDS_TYPE_64: return sizeof(struct sdshdr64);
51+
case SDS_TYPE_32_SHARED: return sizeof(struct sdshdr32shared);
5152
}
5253
return 0;
5354
}
@@ -71,14 +72,15 @@ static inline size_t sdsTypeMaxSize(char type) {
7172
if (type == SDS_TYPE_8) return (1 << 8) - 1;
7273
if (type == SDS_TYPE_16) return (1 << 16) - 1;
7374
#if (LONG_MAX == LLONG_MAX)
74-
if (type == SDS_TYPE_32) return (1ll << 32) - 1;
75+
if (type == SDS_TYPE_32 || type == SDS_TYPE_32_SHARED) return (1ll << 32) - 1;
7576
#endif
7677
return -1; /* this is equivalent to the max SDS_TYPE_64 or SDS_TYPE_32 */
7778
}
7879

7980
static inline int adjustTypeIfNeeded(char *type, int *hdrlen, size_t bufsize) {
8081
size_t usable = bufsize - *hdrlen - 1;
81-
if (*type != SDS_TYPE_5 && usable > sdsTypeMaxSize(*type)) {
82+
if (*type != SDS_TYPE_5 && *type != SDS_TYPE_32_SHARED &&
83+
usable > sdsTypeMaxSize(*type)) {
8284
*type = sdsReqType(usable);
8385
*hdrlen = sdsHdrSize(*type);
8486
return 1;
@@ -124,6 +126,7 @@ sds _sdsnewlen(const void *init, size_t initlen, int trymalloc) {
124126
* can't be greater than `sdsTypeMaxSize(type)`. */
125127
sds sdswrite(char *buf, size_t bufsize, char type, const char *init, size_t initlen) {
126128
assert(bufsize >= sdsReqSize(initlen, type));
129+
assert(type != SDS_TYPE_32_SHARED);
127130
int hdrlen = sdsHdrSize(type);
128131
size_t usable = bufsize - hdrlen - 1;
129132
sds s = buf + hdrlen;
@@ -204,6 +207,15 @@ sds sdsdup(const_sds s) {
204207
/* Free an sds string. No operation is performed if 's' is NULL. */
205208
void sdsfree(sds s) {
206209
if (s == NULL) return;
210+
if (sdsType(s) == SDS_TYPE_32_SHARED) {
211+
SDS_HDR_VAR(32shared, s);
212+
if (atomic_fetch_sub_explicit(&sh->refcount, 1, memory_order_acq_rel) > 1) {
213+
return;
214+
}
215+
sh->freecbfn(sh, sdsAllocSize(s));
216+
s_free_with_size(s + sh->len + 1 - sdsAllocSize(s), sdsAllocSize(s));
217+
return;
218+
}
207219
s_free_with_size(sdsAllocPtr(s), sdsAllocSize(s));
208220
}
209221

@@ -266,6 +278,7 @@ sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
266278
/* Return ASAP if there is enough space left. */
267279
if (avail >= addlen) return s;
268280

281+
assert(oldtype != SDS_TYPE_32_SHARED);
269282
len = sdslen(s);
270283
sh = (char *)s - sdsHdrSize(oldtype);
271284
reqlen = newlen = (len + addlen);
@@ -351,6 +364,7 @@ sds sdsResize(sds s, size_t size, int would_regrow) {
351364

352365
/* Return ASAP if the size is already good. */
353366
if (sdsalloc(s) == size) return s;
367+
assert(oldtype != SDS_TYPE_32_SHARED);
354368

355369
/* Truncate len if needed. */
356370
if (size < len) len = size;
@@ -439,6 +453,44 @@ void *sdsAllocPtr(const_sds s) {
439453
return (void *)(s - sdsHdrSize(sdsType(s)));
440454
}
441455

456+
/* Initialize a shared sds into a buffer `buf`.
457+
*
458+
* Parameters:
459+
* - `buf`: A pointer to the allocated buffer that is initialized a the shared sds.
460+
* - `len`: The length of the sds buffer.
461+
* - `alloc`: The total allocated size of the buffer, including metadata.
462+
* - `freecbfn`: A callback function that is invoked when the sds object is released.
463+
*
464+
* Returns:
465+
* - A pointer to the initialized `sdshdr32shared` structure.
466+
*
467+
* Notes:
468+
* - The caller is responsible for ensuring that `buf` is large enough to accommodate
469+
* the SDS metadata, the string content, and the null terminator.
470+
* - The `freecbfn` does not directly free memory but is used primarily for
471+
* tracking deallocation events.
472+
*/
473+
sdshdr32shared *sdsInitShared(char *buf, size_t len, size_t alloc, sharedSdsFreeCB freecbfn) {
474+
buf[alloc - 1] = 0;
475+
sds s = buf + alloc - len - 1;
476+
SDS_HDR_VAR(32shared, s);
477+
sh->freecbfn = freecbfn;
478+
atomic_init(&sh->refcount, 1);
479+
sh->len = len;
480+
sh->alloc = alloc - sdsHdrSize(SDS_TYPE_32_SHARED) - 1;
481+
sh->flags = SDS_TYPE_32_SHARED;
482+
return sh;
483+
}
484+
485+
/* Increases the reference count of a shared SDS object.
486+
*
487+
* Parameters:
488+
* - `sh`: A pointer to the `sdshdr32shared` structure whose reference count
489+
* should be increased.
490+
*/
491+
void sdsRetain(sdshdr32shared *sh) {
492+
atomic_fetch_add_explicit(&sh->refcount, 1, memory_order_relaxed);
493+
}
442494
/* Increment the sds length and decrements the left free space at the
443495
* end of the string according to 'incr'. Also set the null term
444496
* in the new end of the string.
@@ -498,6 +550,13 @@ void sdsIncrLen(sds s, ssize_t incr) {
498550
len = (sh->len += incr);
499551
break;
500552
}
553+
case SDS_TYPE_32_SHARED: {
554+
SDS_HDR_VAR(32shared, s);
555+
assert((incr >= 0 && sh->alloc - sh->len >= (unsigned int)incr) ||
556+
(incr < 0 && sh->len >= (unsigned int)(-incr)));
557+
len = (sh->len += incr);
558+
break;
559+
}
501560
default: len = 0; /* Just to avoid compilation warnings. */
502561
}
503562
s[len] = '\0';
@@ -779,6 +838,7 @@ sds sdstrim(sds s, const char *cset) {
779838
char *end, *sp, *ep;
780839
size_t len;
781840

841+
assert(sdsType(s) != SDS_TYPE_32_SHARED);
782842
sp = s;
783843
ep = end = s + sdslen(s) - 1;
784844
while (sp <= end && strchr(cset, *sp)) sp++;

0 commit comments

Comments
 (0)