Skip to content

Commit c81c7f5

Browse files
itamarhaberguybe7oranagra
authored
Add stream consumer group lag tracking and reporting (redis#9127)
Adds the ability to track the lag of a consumer group (CG), that is, the number of entries yet-to-be-delivered from the stream. The proposed constant-time solution is in the spirit of "best-effort." Partially addresses redis#8737. ## Description of approach We add a new "entries_added" property to the stream. This starts at 0 for a new stream and is incremented by 1 with every `XADD`. It is essentially an all-time counter of the entries added to the stream. Given the stream's length and this counter value, we can trivially find the logical "entries_added" counter of the first ID if and only if the stream is contiguous. A fragmented stream contains one or more tombstones generated by `XDEL`s. The new "xdel_max_id" stream property tracks the latest tombstone. The CG also tracks its last delivered ID's as an "entries_read" counter and increments it independently when delivering new messages, unless the this read counter is invalid (-1 means invalid offset). When the CG's counter is available, the reported lag is the difference between added and read counters. Lastly, this also adds a "first_id" field to the stream structure in order to make looking it up cheaper in most cases. ## Limitations There are two cases in which the mechanism isn't able to track the lag. In these cases, `XINFO` replies with `null` in the "lag" field. The first case is when a CG is created with an arbitrary last delivered ID, that isn't "0-0", nor the first or the last entries of the stream. In this case, it is impossible to obtain a valid read counter (short of an O(N) operation). The second case is when there are one or more tombstones fragmenting the stream's entries range. In both cases, given enough time and assuming that the consumers are active (reading and lacking) and advancing, the CG should be able to catch up with the tip of the stream and report zero lag. Once that's achieved, lag tracking would resume as normal (until the next tombstone is set). ## API changes * `XGROUP CREATE` added with the optional named argument `[ENTRIESREAD entries-read]` for explicitly specifying the new CG's counter. * `XGROUP SETID` added with an optional positional argument `[ENTRIESREAD entries-read]` for specifying the CG's counter. * `XINFO` reports the maximal tombstone ID, the recorded first entry ID, and total number of entries added to the stream. * `XINFO` reports the current lag and logical read counter of CGs. * `XSETID` is an internal command that's used in replication/aof. It has been added with the optional positional arguments `[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]` for propagating the CG's offset and maximal tombstone ID of the stream. ## The generic unsolved problem The current stream implementation doesn't provide an efficient way to obtain the approximate/exact size of a range of entries. While it could've been nice to have that ability (redis#5813) in general, let alone specifically in the context of CGs, the risk and complexities involved in such implementation are in all likelihood prohibitive. ## A refactoring note The `streamGetEdgeID` has been refactored to accommodate both the existing seek of any entry as well as seeking non-deleted entries (the addition of the `skip_tombstones` argument). Furthermore, this refactoring also migrated the seek logic to use the `streamIterator` (rather than `raxIterator`) that was, in turn, extended with the `skip_tombstones` Boolean struct field to control the emission of these. Co-authored-by: Guy Benoish <[email protected]> Co-authored-by: Oran Agra <[email protected]>
1 parent b857928 commit c81c7f5

17 files changed

+786
-118
lines changed

src/aof.c

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2035,10 +2035,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
20352035

20362036
/* Append XSETID after XADD, make sure lastid is correct,
20372037
* in case of XDEL lastid. */
2038-
if (!rioWriteBulkCount(r,'*',3) ||
2038+
if (!rioWriteBulkCount(r,'*',7) ||
20392039
!rioWriteBulkString(r,"XSETID",6) ||
20402040
!rioWriteBulkObject(r,key) ||
2041-
!rioWriteBulkStreamID(r,&s->last_id))
2041+
!rioWriteBulkStreamID(r,&s->last_id) ||
2042+
!rioWriteBulkString(r,"ENTRIESADDED",12) ||
2043+
!rioWriteBulkLongLong(r,s->entries_added) ||
2044+
!rioWriteBulkString(r,"MAXDELETEDID",12) ||
2045+
!rioWriteBulkStreamID(r,&s->max_deleted_entry_id))
20422046
{
20432047
streamIteratorStop(&si);
20442048
return 0;
@@ -2053,12 +2057,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
20532057
while(raxNext(&ri)) {
20542058
streamCG *group = ri.data;
20552059
/* Emit the XGROUP CREATE in order to create the group. */
2056-
if (!rioWriteBulkCount(r,'*',5) ||
2060+
if (!rioWriteBulkCount(r,'*',7) ||
20572061
!rioWriteBulkString(r,"XGROUP",6) ||
20582062
!rioWriteBulkString(r,"CREATE",6) ||
20592063
!rioWriteBulkObject(r,key) ||
20602064
!rioWriteBulkString(r,(char*)ri.key,ri.key_len) ||
2061-
!rioWriteBulkStreamID(r,&group->last_id))
2065+
!rioWriteBulkStreamID(r,&group->last_id) ||
2066+
!rioWriteBulkString(r,"ENTRIESREAD",11) ||
2067+
!rioWriteBulkLongLong(r,group->entries_read))
20622068
{
20632069
raxStop(&ri);
20642070
streamIteratorStop(&si);

src/cluster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5701,7 +5701,7 @@ int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
57015701
if (len < 10) return C_ERR;
57025702
footer = p+(len-10);
57035703

5704-
/* Verify RDB version */
5704+
/* Set and verify RDB version. */
57055705
rdbver = (footer[1] << 8) | footer[0];
57065706
if (rdbver_ptr) {
57075707
*rdbver_ptr = rdbver;

src/commands.c

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5998,7 +5998,10 @@ struct redisCommandArg XDEL_Args[] = {
59985998
/********** XGROUP CREATE ********************/
59995999

60006000
/* XGROUP CREATE history */
6001-
#define XGROUP_CREATE_History NULL
6001+
commandHistory XGROUP_CREATE_History[] = {
6002+
{"7.0.0","Added the `entries_read` named argument."},
6003+
{0}
6004+
};
60026005

60036006
/* XGROUP CREATE tips */
60046007
#define XGROUP_CREATE_tips NULL
@@ -6016,6 +6019,7 @@ struct redisCommandArg XGROUP_CREATE_Args[] = {
60166019
{"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
60176020
{"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_CREATE_id_Subargs},
60186021
{"mkstream",ARG_TYPE_PURE_TOKEN,-1,"MKSTREAM",NULL,NULL,CMD_ARG_OPTIONAL},
6022+
{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL},
60196023
{0}
60206024
};
60216025

@@ -6077,7 +6081,10 @@ struct redisCommandArg XGROUP_DESTROY_Args[] = {
60776081
/********** XGROUP SETID ********************/
60786082

60796083
/* XGROUP SETID history */
6080-
#define XGROUP_SETID_History NULL
6084+
commandHistory XGROUP_SETID_History[] = {
6085+
{"7.0.0","Added the optional `entries_read` argument."},
6086+
{0}
6087+
};
60816088

60826089
/* XGROUP SETID tips */
60836090
#define XGROUP_SETID_tips NULL
@@ -6094,6 +6101,7 @@ struct redisCommandArg XGROUP_SETID_Args[] = {
60946101
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
60956102
{"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
60966103
{"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_SETID_id_Subargs},
6104+
{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL},
60976105
{0}
60986106
};
60996107

@@ -6104,7 +6112,7 @@ struct redisCommand XGROUP_Subcommands[] = {
61046112
{"delconsumer","Delete a consumer from a consumer group.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DELCONSUMER_History,XGROUP_DELCONSUMER_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DELCONSUMER_Args},
61056113
{"destroy","Destroy a consumer group.","O(N) where N is the number of entries in the group's pending entries list (PEL).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DESTROY_History,XGROUP_DESTROY_tips,xgroupCommand,4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DESTROY_Args},
61066114
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_HELP_History,XGROUP_HELP_tips,xgroupCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_STREAM},
6107-
{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args},
6115+
{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,-5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args},
61086116
{0}
61096117
};
61106118

@@ -6137,7 +6145,10 @@ struct redisCommandArg XINFO_CONSUMERS_Args[] = {
61376145
/********** XINFO GROUPS ********************/
61386146

61396147
/* XINFO GROUPS history */
6140-
#define XINFO_GROUPS_History NULL
6148+
commandHistory XINFO_GROUPS_History[] = {
6149+
{"7.0.0","Added the `entries-read` and `lag` fields"},
6150+
{0}
6151+
};
61416152

61426153
/* XINFO GROUPS tips */
61436154
#define XINFO_GROUPS_tips NULL
@@ -6159,7 +6170,10 @@ struct redisCommandArg XINFO_GROUPS_Args[] = {
61596170
/********** XINFO STREAM ********************/
61606171

61616172
/* XINFO STREAM history */
6162-
#define XINFO_STREAM_History NULL
6173+
commandHistory XINFO_STREAM_History[] = {
6174+
{"7.0.0","Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields"},
6175+
{0}
6176+
};
61636177

61646178
/* XINFO STREAM tips */
61656179
#define XINFO_STREAM_tips NULL
@@ -6338,7 +6352,10 @@ struct redisCommandArg XREVRANGE_Args[] = {
63386352
/********** XSETID ********************/
63396353

63406354
/* XSETID history */
6341-
#define XSETID_History NULL
6355+
commandHistory XSETID_History[] = {
6356+
{"7.0.0","Added the `entries_added` and `max_deleted_entry_id` arguments."},
6357+
{0}
6358+
};
63426359

63436360
/* XSETID tips */
63446361
#define XSETID_tips NULL
@@ -6347,6 +6364,8 @@ struct redisCommandArg XREVRANGE_Args[] = {
63476364
struct redisCommandArg XSETID_Args[] = {
63486365
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
63496366
{"last-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
6367+
{"entries_added",ARG_TYPE_INTEGER,-1,"ENTRIESADDED",NULL,NULL,CMD_ARG_OPTIONAL},
6368+
{"max_deleted_entry_id",ARG_TYPE_STRING,-1,"MAXDELETEDID",NULL,NULL,CMD_ARG_OPTIONAL},
63506369
{0}
63516370
};
63526371

@@ -7057,7 +7076,7 @@ struct redisCommand redisCommandTable[] = {
70577076
{"xread","Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.","For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREAD_History,XREAD_tips,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",1},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREAD_Args},
70587077
{"xreadgroup","Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREADGROUP_History,XREADGROUP_tips,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",4},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREADGROUP_Args},
70597078
{"xrevrange","Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREVRANGE_History,XREVRANGE_tips,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XREVRANGE_Args},
7060-
{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args},
7079+
{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args},
70617080
{"xtrim","Trims the stream to (approximately if '~' is passed) a certain size","O(N), with N being the number of evicted entries. Constant times are very small however, since entries are organized in macro nodes containing multiple entries that can be released with a single deallocation.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XTRIM_History,XTRIM_tips,xtrimCommand,-4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XTRIM_Args},
70627081
/* string */
70637082
{"append","Append a value to a key","O(1). The amortized time complexity is O(1) assuming the appended value is small and the already present value is of any size, since the dynamic string library used by Redis will double the free space available on every reallocation.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,APPEND_History,APPEND_tips,appendCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STRING,{{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=APPEND_Args},

src/commands/xgroup-create.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
"arity": -5,
88
"container": "XGROUP",
99
"function": "xgroupCommand",
10+
"history": [
11+
[
12+
"7.0.0",
13+
"Added the `entries_read` named argument."
14+
]
15+
],
1016
"command_flags": [
1117
"WRITE",
1218
"DENYOOM"
@@ -64,6 +70,12 @@
6470
"name": "mkstream",
6571
"type": "pure-token",
6672
"optional": true
73+
},
74+
{
75+
"token": "ENTRIESREAD",
76+
"name": "entries_read",
77+
"type": "integer",
78+
"optional": true
6779
}
6880
]
6981
}

src/commands/xgroup-setid.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,15 @@
44
"complexity": "O(1)",
55
"group": "stream",
66
"since": "5.0.0",
7-
"arity": 5,
7+
"arity": -5,
88
"container": "XGROUP",
99
"function": "xgroupCommand",
10+
"history": [
11+
[
12+
"7.0.0",
13+
"Added the optional `entries_read` argument."
14+
]
15+
],
1016
"command_flags": [
1117
"WRITE"
1218
],
@@ -57,6 +63,12 @@
5763
"token": "$"
5864
}
5965
]
66+
},
67+
{
68+
"name": "entries_read",
69+
"token": "ENTRIESREAD",
70+
"type": "integer",
71+
"optional": true
6072
}
6173
]
6274
}

src/commands/xinfo-groups.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
"since": "5.0.0",
77
"arity": 3,
88
"container": "XINFO",
9+
"history": [
10+
[
11+
"7.0.0",
12+
"Added the `entries-read` and `lag` fields"
13+
]
14+
],
915
"function": "xinfoCommand",
1016
"command_flags": [
1117
"READONLY"

src/commands/xinfo-stream.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
"since": "5.0.0",
77
"arity": -3,
88
"container": "XINFO",
9+
"history": [
10+
[
11+
"7.0.0",
12+
"Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields"
13+
]
14+
],
915
"function": "xinfoCommand",
1016
"command_flags": [
1117
"READONLY"

src/commands/xsetid.json

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@
44
"complexity": "O(1)",
55
"group": "stream",
66
"since": "5.0.0",
7-
"arity": 3,
7+
"arity": -3,
88
"function": "xsetidCommand",
9+
"history": [
10+
[
11+
"7.0.0",
12+
"Added the `entries_added` and `max_deleted_entry_id` arguments."
13+
]
14+
],
915
"command_flags": [
1016
"WRITE",
1117
"DENYOOM",
@@ -43,6 +49,18 @@
4349
{
4450
"name": "last-id",
4551
"type": "string"
52+
},
53+
{
54+
"name": "entries_added",
55+
"token": "ENTRIESADDED",
56+
"type": "integer",
57+
"optional": true
58+
},
59+
{
60+
"name": "max_deleted_entry_id",
61+
"token": "MAXDELETEDID",
62+
"type": "string",
63+
"optional": true
4664
}
4765
]
4866
}

src/rdb.c

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
692692
else
693693
serverPanic("Unknown hash encoding");
694694
case OBJ_STREAM:
695-
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
695+
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2);
696696
case OBJ_MODULE:
697697
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
698698
default:
@@ -986,6 +986,19 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
986986
nwritten += n;
987987
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
988988
nwritten += n;
989+
/* Save the first entry ID. */
990+
if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1;
991+
nwritten += n;
992+
if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1;
993+
nwritten += n;
994+
/* Save the maximal tombstone ID. */
995+
if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1;
996+
nwritten += n;
997+
if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1;
998+
nwritten += n;
999+
/* Save the offset. */
1000+
if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1;
1001+
nwritten += n;
9891002

9901003
/* The consumer groups and their clients are part of the stream
9911004
* type, so serialize every consumer group. */
@@ -1020,6 +1033,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
10201033
return -1;
10211034
}
10221035
nwritten += n;
1036+
1037+
/* Save the group's logical reads counter. */
1038+
if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) {
1039+
raxStop(&ri);
1040+
return -1;
1041+
}
1042+
nwritten += n;
10231043

10241044
/* Save the global PEL. */
10251045
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
@@ -2321,7 +2341,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
23212341
rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
23222342
break;
23232343
}
2324-
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
2344+
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
23252345
o = createStreamObject();
23262346
stream *s = o->ptr;
23272347
uint64_t listpacks = rdbLoadLen(rdb,NULL);
@@ -2397,6 +2417,30 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
23972417
/* Load the last entry ID. */
23982418
s->last_id.ms = rdbLoadLen(rdb,NULL);
23992419
s->last_id.seq = rdbLoadLen(rdb,NULL);
2420+
2421+
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
2422+
/* Load the first entry ID. */
2423+
s->first_id.ms = rdbLoadLen(rdb,NULL);
2424+
s->first_id.seq = rdbLoadLen(rdb,NULL);
2425+
2426+
/* Load the maximal deleted entry ID. */
2427+
s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL);
2428+
s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL);
2429+
2430+
/* Load the offset. */
2431+
s->entries_added = rdbLoadLen(rdb,NULL);
2432+
} else {
2433+
/* During migration the offset can be initialized to the stream's
2434+
* length. At this point, we also don't care about tombstones
2435+
* because CG offsets will be later initialized as well. */
2436+
s->max_deleted_entry_id.ms = 0;
2437+
s->max_deleted_entry_id.seq = 0;
2438+
s->entries_added = s->length;
2439+
2440+
/* Since the rax is already loaded, we can find the first entry's
2441+
* ID. */
2442+
streamGetEdgeID(s,1,1,&s->first_id);
2443+
}
24002444

24012445
if (rioGetReadError(rdb)) {
24022446
rdbReportReadError("Stream object metadata loading failed.");
@@ -2432,8 +2476,22 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
24322476
decrRefCount(o);
24332477
return NULL;
24342478
}
2479+
2480+
/* Load group offset. */
2481+
uint64_t cg_offset;
2482+
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
2483+
cg_offset = rdbLoadLen(rdb,NULL);
2484+
if (rioGetReadError(rdb)) {
2485+
rdbReportReadError("Stream cgroup offset loading failed.");
2486+
sdsfree(cgname);
2487+
decrRefCount(o);
2488+
return NULL;
2489+
}
2490+
} else {
2491+
cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id);
2492+
}
24352493

2436-
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
2494+
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset);
24372495
if (cgroup == NULL) {
24382496
rdbReportCorruptRDB("Duplicated consumer group name %s",
24392497
cgname);

src/rdb.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@
9494
#define RDB_TYPE_HASH_LISTPACK 16
9595
#define RDB_TYPE_ZSET_LISTPACK 17
9696
#define RDB_TYPE_LIST_QUICKLIST_2 18
97+
#define RDB_TYPE_STREAM_LISTPACKS_2 19
9798
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
9899

99100
/* Test if a type is an object type. */
100-
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
101+
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 19))
101102

102103
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
103104
#define RDB_OPCODE_FUNCTION 246 /* engine data */

src/server.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,7 @@ void createSharedObjects(void) {
17671767
shared.retrycount = createStringObject("RETRYCOUNT",10);
17681768
shared.force = createStringObject("FORCE",5);
17691769
shared.justid = createStringObject("JUSTID",6);
1770+
shared.entriesread = createStringObject("ENTRIESREAD",11);
17701771
shared.lastid = createStringObject("LASTID",6);
17711772
shared.default_username = createStringObject("default",7);
17721773
shared.ping = createStringObject("ping",4);

src/server.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1223,7 +1223,7 @@ struct sharedObjectsStruct {
12231223
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
12241224
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
12251225
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
1226-
*time, *pxat, *absttl, *retrycount, *force, *justid,
1226+
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
12271227
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
12281228
*getack, *special_asterick, *special_equals, *default_username, *redacted,
12291229
*ssubscribebulk,*sunsubscribebulk,

0 commit comments

Comments
 (0)