Skip to content

Commit 5458d64

Browse files
committed
Backdeploy #100 fix and deprecate Channel Redis pipeline APIs
This is a backdeploy of commit cfb99ba. This fixes issue #100
1 parent ad43590 commit 5458d64

File tree

4 files changed

+83
-17
lines changed

4 files changed

+83
-17
lines changed

Sources/RediStack/Extensions/SwiftNIO.swift

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2019-2020 RediStack project authors
5+
// Copyright (c) 2019-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -26,7 +26,7 @@ extension TimeAmount {
2626

2727
// MARK: Pipeline manipulation
2828

29-
extension Channel {
29+
extension ChannelPipeline {
3030
/// Adds the baseline channel handlers needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
3131
///
3232
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
@@ -62,7 +62,7 @@ extension Channel {
6262
(RedisCommandHandler(), "RediStack.CommandHandler")
6363
]
6464
return .andAllSucceed(
65-
handlers.map { self.pipeline.addHandler($0, name: $1) },
65+
handlers.map { self.addHandler($0, name: $1) },
6666
on: self.eventLoop
6767
)
6868
}
@@ -106,14 +106,38 @@ extension Channel {
106106
/// | [ Socket.read ] | | [ Socket.write ] |
107107
/// +-----------------+ +------------------+
108108
/// - Returns: A `NIO.EventLoopFuture` that resolves the instance of the PubSubHandler that was added to the pipeline.
109-
public func addPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
110-
return self.pipeline
111-
.handler(type: RedisCommandHandler.self)
112-
.flatMap {
113-
let pubsubHandler = RedisPubSubHandler(eventLoop: self.eventLoop)
114-
return self.pipeline
115-
.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
116-
.map { pubsubHandler }
109+
public func addRedisPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
110+
// first try to return the handler that already exists in the pipeline
111+
112+
return self.handler(type: RedisPubSubHandler.self)
113+
.flatMapError {
114+
// if it doesn't exist, add it to the pipeline
115+
guard
116+
let error = $0 as? ChannelPipelineError,
117+
error == .notFound
118+
else { return self.eventLoop.makeFailedFuture($0) }
119+
120+
return self.handler(type: RedisCommandHandler.self)
121+
.flatMap {
122+
let pubsubHandler = RedisPubSubHandler(eventLoop: self.eventLoop)
123+
return self.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
124+
.map { pubsubHandler }
125+
}
126+
}
127+
}
128+
129+
/// Removes the provided Redis PubSub handler.
130+
/// - Returns: A `NIO.EventLoopFuture` that resolves when the handler was removed from the pipeline.
131+
public func removeRedisPubSubHandler(_ handler: RedisPubSubHandler) -> EventLoopFuture<Void> {
132+
self.removeHandler(handler)
133+
.flatMapError {
134+
// if it was already removed, then we can just succeed
135+
guard
136+
let error = $0 as? ChannelPipelineError,
137+
error == .alreadyRemoved
138+
else { return self.eventLoop.makeFailedFuture($0) }
139+
140+
return self.eventLoop.makeSucceededVoidFuture()
117141
}
118142
}
119143
}
@@ -135,6 +159,6 @@ extension ClientBootstrap {
135159
ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR),
136160
value: 1
137161
)
138-
.channelInitializer { $0.addBaseRedisHandlers() }
162+
.channelInitializer { $0.pipeline.addBaseRedisHandlers() }
139163
}
140164
}

Sources/RediStack/RedisConnection.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,8 @@ extension RedisConnection {
451451
guard case let .pubsub(handler) = self.state else {
452452
logger.debug("not in pubsub mode, moving to pubsub mode")
453453
// otherwise, add it to the pipeline, add the subscriptions, and update our state after it was successful
454-
return self.channel
455-
.addPubSubHandler()
454+
return self.channel.pipeline
455+
.addRedisPubSubHandler()
456456
.flatMap { handler in
457457
logger.trace("handler added, adding subscription")
458458
return handler
@@ -466,7 +466,8 @@ extension RedisConnection {
466466
)
467467
// if there was an error, no subscriptions were made
468468
// so remove the handler and propogate the error to the caller by rethrowing it
469-
return self.channel.pipeline.removeHandler(handler)
469+
return self.channel.pipeline
470+
.removeRedisPubSubHandler(handler)
470471
.flatMapThrowing { throw error }
471472
}
472473
// success, return the handler
@@ -542,7 +543,8 @@ extension RedisConnection {
542543
}
543544
logger.debug("subscription removed, with no current active subscriptions. leaving pubsub mode")
544545
// otherwise, remove the handler and update our state
545-
return self.channel.pipeline.removeHandler(handler)
546+
return self.channel.pipeline
547+
.removeRedisPubSubHandler(handler)
546548
.map {
547549
self.state = .open
548550
logger.debug("connection is now open to all commands")

Sources/RediStack/_Deprecations.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,15 @@ extension RedisKey.Lifetime {
139139
@available(*, deprecated, renamed: "Duration")
140140
public typealias Lifetime = Duration
141141
}
142+
143+
extension Channel {
144+
@available(*, deprecated, renamed: "pipeline.addBaseRedisHandlers()")
145+
public func addBaseRedisHandlers() -> EventLoopFuture<Void> {
146+
return self.pipeline.addBaseRedisHandlers()
147+
}
148+
149+
@available(*, deprecated, renamed: "pipeline.addRedisPubSubHandler()")
150+
public func addPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
151+
return self.pipeline.addRedisPubSubHandler()
152+
}
153+
}

Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2020 RediStack project authors
5+
// Copyright (c) 2020-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -12,6 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import NIO
1516
import RediStack
1617
import RediStackTestUtils
1718
import XCTest
@@ -340,3 +341,30 @@ final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTest
340341
XCTAssertEqual(self.pool.leasedConnectionCount, 0)
341342
}
342343
}
344+
345+
// MARK: - #100 subscribe race condition
346+
347+
extension RedisPubSubCommandsTests {
348+
func test_pubsub_pipelineChanges_hasNoRaceCondition() throws {
349+
func runOperation(_ factory: (RedisChannelName) -> EventLoopFuture<Void>) -> EventLoopFuture<Void> {
350+
return .andAllSucceed(
351+
(0...100_000).reduce(into: []) {
352+
result, index in
353+
354+
result.append(factory("\(#function)-\(index)"))
355+
},
356+
on: self.connection.eventLoop
357+
)
358+
}
359+
360+
// subscribing (adding handler)
361+
try runOperation { self.connection.subscribe(to: $0) { _, _ in } }
362+
.wait()
363+
364+
// unsubscribing (removing handler)
365+
try runOperation { self.connection.unsubscribe(from: $0) }
366+
.wait()
367+
368+
try self.connection.close().wait()
369+
}
370+
}

0 commit comments

Comments
 (0)