@@ -108,14 +108,19 @@ public final class RedisPubSubHandler {
108
108
/// A queue of unsubscribe changes awaiting notification of completion.
109
109
private var pendingUnsubscribes : PendingSubscriptionChangeQueue
110
110
111
+ private let eventLoop : EventLoop
112
+
111
113
// we need to be extra careful not to use this context before we know we've initialized
112
114
private var context : ChannelHandlerContext !
113
115
114
- /// - Parameter queueCapacity: The initial capacity of queues used for processing subscription changes. The initial value is `3`.
116
+ /// - Parameters:
117
+ /// - eventLoop: The event loop the `NIO.Channel` that this handler was added to is bound to.
118
+ /// - queueCapacity: The initial capacity of queues used for processing subscription changes. The initial value is `3`.
115
119
///
116
- /// Unless you are subscribing and unsubscribing from a large volume of channels or patterns at a single time,
117
- /// such as a single SUBSCRIBE call, you do not need to modify this value.
118
- public init ( initialSubscriptionQueueCapacity queueCapacity: Int = 3 ) {
120
+ /// Unless you are subscribing and unsubscribing from a large volume of channels or patterns at a single time,
121
+ /// such as a single SUBSCRIBE call, you do not need to modify this value.
122
+ public init ( eventLoop: EventLoop , initialSubscriptionQueueCapacity queueCapacity: Int = 3 ) {
123
+ self . eventLoop = eventLoop
119
124
self . subscriptions = [ : ]
120
125
self . pendingSubscribes = [ : ]
121
126
self . pendingUnsubscribes = [ : ]
@@ -183,8 +188,21 @@ extension RedisPubSubHandler {
183
188
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler ? ,
184
189
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler ?
185
190
) -> EventLoopFuture < Int > {
191
+ guard self . eventLoop. inEventLoop else {
192
+ return self . eventLoop. flatSubmit {
193
+ return self . addSubscription (
194
+ for: target,
195
+ messageReceiver: receiver,
196
+ onSubscribe: subscribeHandler,
197
+ onUnsubscribe: unsubscribeHandler
198
+ )
199
+ }
200
+ }
201
+
186
202
switch self . state {
187
- case let . error( e) : return self . context. eventLoop. makeFailedFuture ( e)
203
+ case . removed: return self . eventLoop. makeFailedFuture ( RedisClientError . subscriptionModeRaceCondition)
204
+
205
+ case let . error( e) : return self . eventLoop. makeFailedFuture ( e)
188
206
189
207
case . default:
190
208
// go through all the target patterns/names and update the map with the new receiver if it's already registered
@@ -206,7 +224,7 @@ extension RedisPubSubHandler {
206
224
// if there aren't any new actual subscriptions,
207
225
// then we just short circuit and return our local count of subscriptions
208
226
guard !newSubscriptionTargets. isEmpty else {
209
- return self . context . eventLoop. makeSucceededFuture ( self . subscriptions. count)
227
+ return self . eventLoop. makeSucceededFuture ( self . subscriptions. count)
210
228
}
211
229
212
230
return self . sendSubscriptionChange (
@@ -221,9 +239,13 @@ extension RedisPubSubHandler {
221
239
/// - Parameter target: The channel or pattern that a receiver should be removed for.
222
240
/// - Returns: A `NIO.EventLoopFuture` that resolves the number of subscriptions the client has after the subscription has been removed.
223
241
public func removeSubscription( for target: RedisSubscriptionTarget ) -> EventLoopFuture < Int > {
242
+ guard self . eventLoop. inEventLoop else {
243
+ return self . eventLoop. flatSubmit { self . removeSubscription ( for: target) }
244
+ }
245
+
224
246
// if we're not in our default state,
225
247
// this essentially is a no-op because an error triggers all receivers to be removed
226
- guard case . default = self . state else { return self . context . eventLoop. makeSucceededFuture ( 0 ) }
248
+ guard case . default = self . state else { return self . eventLoop. makeSucceededFuture ( 0 ) }
227
249
228
250
// we send the UNSUBSCRIBE message to Redis,
229
251
// and in the response we handle the actual removal of the receiver closure
@@ -240,15 +262,7 @@ extension RedisPubSubHandler {
240
262
subscriptionTargets targets: [ String ] ,
241
263
queue pendingQueue: ReferenceWritableKeyPath < RedisPubSubHandler , PendingSubscriptionChangeQueue >
242
264
) -> EventLoopFuture < Int > {
243
- guard self . context. eventLoop. inEventLoop else {
244
- return self . context. eventLoop. flatSubmit {
245
- return self . sendSubscriptionChange (
246
- subscriptionChangeKeyword: keyword,
247
- subscriptionTargets: targets,
248
- queue: pendingQueue
249
- )
250
- }
251
- }
265
+ self . eventLoop. assertInEventLoop ( )
252
266
253
267
var command = [ RESPValue ( bulk: keyword) ]
254
268
command. append ( convertingContentsOf: targets)
@@ -263,7 +277,7 @@ extension RedisPubSubHandler {
263
277
264
278
// create them
265
279
let pendingSubscriptions : [ ( String , EventLoopPromise < Int > ) ] = targets. map {
266
- return ( $0, self . context . eventLoop. makePromise ( ) )
280
+ return ( $0, self . eventLoop. makePromise ( ) )
267
281
}
268
282
// add the subscription change handler to the appropriate queue for each individual subscription target
269
283
pendingSubscriptions. forEach { self [ keyPath: pendingQueue] . updateValue ( $1, forKey: $0) }
@@ -272,13 +286,13 @@ extension RedisPubSubHandler {
272
286
let subscriptionCountFuture = EventLoopFuture < Int >
273
287
. whenAllComplete (
274
288
pendingSubscriptions. map { $0. 1 . futureResult } ,
275
- on: self . context . eventLoop
289
+ on: self . eventLoop
276
290
)
277
291
. flatMapThrowing { ( results) -> Int in
278
292
// trust the last success response as the most current count
279
293
guard let latestSubscriptionCount = results
280
294
. lazy
281
- . reversed ( ) // reverse to save complexity, as we just need the last (first) successful value
295
+ . reversed ( ) // reverse to save time- complexity, as we just need the last (first) successful value
282
296
. compactMap ( { try ? $0. get ( ) } )
283
297
. first
284
298
// if we have no success cases, we will still have at least one response that we can
@@ -309,7 +323,8 @@ extension RedisPubSubHandler {
309
323
310
324
extension RedisPubSubHandler : RemovableChannelHandler {
311
325
public func removeHandler( context: ChannelHandlerContext , removalToken: ChannelHandlerContext . RemovalToken ) {
312
- // leave immediately so we don't get any more subscription requests
326
+ // update our state and leave immediately so we don't get any more subscription requests
327
+ self . state = . removed
313
328
context. leavePipeline ( removalToken: removalToken)
314
329
// "close" all subscription handlers
315
330
self . removeAllReceivers ( )
@@ -335,16 +350,16 @@ extension RedisPubSubHandler: ChannelInboundHandler {
335
350
// these guards extract some of the basic details of a pubsub message
336
351
guard
337
352
let array = value. array,
338
- ! array. isEmpty ,
353
+ array. count >= 3 ,
339
354
let channelOrPattern = array [ 1 ] . string,
340
355
let messageKeyword = array [ 0 ] . string
341
356
else {
342
357
context. fireChannelRead ( data)
343
358
return
344
359
}
345
360
346
- // safe because the array is guaranteed from the guard above to have at least 1 element
347
- // and it is not to be used until we match the PubSub message keyword
361
+ // safe because the array is guaranteed from the guard above to have at least 3 elements
362
+ // and it is NOT to be used until we match the PubSub message keyword
348
363
let message = array. last!
349
364
350
365
// the last check is to match one of the known pubsub message keywords
@@ -438,7 +453,7 @@ extension RedisPubSubHandler {
438
453
}
439
454
440
455
private enum State {
441
- case `default`, error( Error )
456
+ case `default`, removed , error( Error )
442
457
}
443
458
}
444
459
0 commit comments