@@ -82,43 +82,49 @@ class RedisModel {
82
82
res . entries = await stream . xrevrange ( streamKey , '+' , '-' , this . streamIDs [ streamKey ] )
83
83
if ( res . groups === 0 ) res . groups = [ ]
84
84
else {
85
- res . groups = await stream . getGroupsInfo ( streamKey )
86
- res . groups = await Promise . all ( res . groups . map ( async ( i ) => {
87
- if ( i . consumers === 0 ) i . consumers = [ ]
88
- else {
89
- let res = await stream . getConsumersInfo ( i . name )
90
- res = await Promise . all ( res . map ( async j => {
91
- j [ 'pel-count' ] = j . pending
92
- if ( j . pending !== 0 ) {
93
- j . pending = await stream . readPending ( {
94
- group : i . name , consumer : j . name ,
95
- start : '-' , end : '+' , count : 10 ,
96
- } )
97
- } else j . pending = [ ]
98
- return j
99
- } ) )
100
- i . consumers = res
101
- }
102
- i [ 'pel-count' ] = i . pending
103
- if ( i . pending === 0 ) i . pending = [ ]
104
- else {
105
- let res = await stream . readPending ( {
106
- group : i . name ,
107
- start : '-' , end : '+' , count : 10 ,
108
- } )
109
- i . pending = res
110
- }
111
-
112
- return i
113
- } ) )
114
- // if ()
85
+ res . groups = await this . getGroupInfo ( streamKey , stream )
115
86
}
116
87
} else {
117
88
res . entries = await stream . xrevrange ( streamKey , '+' , '-' , this . streamIDs [ streamKey ] )
118
89
}
119
90
this . streamIDs [ streamKey ] += SHOW_MORE_COUNT
120
91
return res
121
92
}
93
+ async getGroupInfo ( streamKey , redisStream ) {
94
+ if ( ! redisStream ) redisStream = new RedisStream ( { client : this . client , stream : streamKey } )
95
+ const res = await redisStream . getGroupsInfo ( streamKey )
96
+ return Promise . all ( res . map ( async ( i ) => {
97
+ if ( i . consumers === 0 ) i . consumers = [ ]
98
+ else {
99
+ i . consumers = await this . getConsumersInfo ( i . name , streamKey , redisStream )
100
+ }
101
+ i [ 'pel-count' ] = i . pending
102
+ if ( i . pending === 0 ) i . pending = [ ]
103
+ else {
104
+ let res = await redisStream . readPending ( {
105
+ group : i . name ,
106
+ start : '-' , end : '+' , count : 10 ,
107
+ } )
108
+ i . pending = res
109
+ }
110
+
111
+ return i
112
+ } ) )
113
+ }
114
+ async getConsumersInfo ( group , streamKey , redisStream ) {
115
+ if ( ! redisStream ) redisStream = new RedisStream ( { client : this . client , stream : streamKey } )
116
+ const res = await redisStream . getConsumersInfo ( group , streamKey )
117
+ return Promise . all ( res . map ( async j => {
118
+ j [ 'pel-count' ] = j . pending
119
+ if ( j . pending !== 0 ) {
120
+ j . pending = await redisStream . readPending ( {
121
+ group, consumer : j . name ,
122
+ start : '-' , end : '+' , count : 10 ,
123
+ } )
124
+ } else j . pending = [ ]
125
+ return j
126
+ } ) )
127
+ }
122
128
async getInfoById ( id , stream ) {
123
129
const redis = new RedisStream ( { client : this . client , stream } )
124
130
const [ res ] = await redis . xrange ( stream , id , '+' , 1 )
0 commit comments