Skip to content

Commit e7b597f

Browse files
committed
Add support for PubSub
Motivation: One of the great features of Redis is being able to subscribe and receive messages published to specific channels as a way of acting as a message queue for processing jobs. PubSub requires a specific understanding of the connection model that can only be implemented directly in this library. Modifications: - Add: `RedisPubSubHandler` to sit in front of `RedisCommandHandler` to manage subscription callbacks and Redis registration - Add: `publish` and the `pubsub` commands - Add: `addPubSubHandler` extension to `NIO.Channel` - Add: Type-safe String wrapper of `RedisChannelName` for PubSub methods - Add: `pubsubSubscriptionNotFound` error case - Add: `isSubscribed` property to `RedisConnection` - Add: `availableConnectionCount` and `leasedConnectionCount` properties to `RedisConnectionPool` - Add: Metrics for PubSub - Add: `makeNewPool` factory method to `RedisConnectionPoolIntegrationTestCase` - Change: `RedisClient` to require methods for PubSub management, as they are intrinsicly tied to the client's connection model - Change: Parsing of `PING` response for handling special case in PubSub mode - Rename: `ActiveConnectionGauge` to `RedisMetrics.IncrementalGauge` Result: Developers will now be able to use Redis in PubSub mode with both connections and pools. This resolves #6
1 parent 45f665b commit e7b597f

13 files changed

+1607
-62
lines changed

Sources/RediStack/ChannelHandlers/RedisPubSubHandler.swift

Lines changed: 467 additions & 0 deletions
Large diffs are not rendered by default.

Sources/RediStack/Commands/BasicCommands.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2019 RediStack project authors
5+
// Copyright (c) 2019-2020 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -36,7 +36,16 @@ extension RedisClient {
3636
? [.init(bulk: message!)] // safe because we did a nil pre-check
3737
: []
3838
return send(command: "PING", with: args)
39-
.tryConverting()
39+
.flatMapThrowing {
40+
// because PING is a special command allowed during pub/sub, we do manual conversion
41+
// this is because the response format is different in pub/sub ([pong,<message>])
42+
guard let response = $0.string ?? $0.array?[1].string else {
43+
throw RedisClientError.assertionFailure(message: "ping message not found")
44+
}
45+
// if no message was sent in the ping in pubsub, then the response will be an empty string
46+
// so we mimic a normal PONG response as if we weren't in pubsub
47+
return response.isEmpty ? "PONG" : response
48+
}
4049
}
4150

4251
/// Select the Redis logical database having the specified zero-based numeric index.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the RediStack open source project
4+
//
5+
// Copyright (c) 2020 RediStack project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of RediStack project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
17+
// MARK: Publish
18+
19+
extension RedisClient {
20+
/// Publishes the provided message to a specific Redis channel.
21+
///
22+
/// See [PUBLISH](https://redis.io/commands/publish)
23+
/// - Parameters:
24+
/// - message: The "message" value to publish on the channel.
25+
/// - channel: The name of the channel to publish the message to.
26+
/// - Returns: The number of subscribed clients that received the message.
27+
@inlinable
28+
@discardableResult
29+
public func publish<Message: RESPValueConvertible>(
30+
_ message: Message,
31+
to channel: RedisChannelName
32+
) -> EventLoopFuture<Int> {
33+
let args: [RESPValue] = [
34+
.init(from: channel),
35+
message.convertedToRESPValue()
36+
]
37+
return self.send(command: "PUBLISH", with: args)
38+
.tryConverting()
39+
}
40+
}
41+
42+
// MARK: PubSub Sub-commands
43+
44+
extension RedisClient {
45+
/// Resolves a list of all the channels that have at least 1 (non-pattern) subscriber.
46+
///
47+
/// See [PUBSUB CHANNELS](https://redis.io/commands/pubsub#pubsub-channels-pattern)
48+
/// - Note: If no `match` pattern is provided, all active channels will be returned.
49+
/// - Parameter match: An optional pattern of channel names to filter for.
50+
/// - Returns: A list of all active channel names.
51+
public func activeChannels(matching match: String? = nil) -> EventLoopFuture<[RedisChannelName]> {
52+
var args: [RESPValue] = [.init(bulk: "CHANNELS")]
53+
54+
if let m = match { args.append(.init(bulk: m)) }
55+
56+
return self.send(command: "PUBSUB", with: args)
57+
.tryConverting()
58+
}
59+
60+
/// Resolves the total count of active subscriptions to channels that were made using patterns.
61+
///
62+
/// See [PUBSUB NUMPAT](https://redis.io/commands/pubsub#codepubsub-numpatcode)
63+
/// - Returns: The total count of subscriptions made through patterns.
64+
public func patternSubscriberCount() -> EventLoopFuture<Int> {
65+
let args: [RESPValue] = [.init(bulk: "NUMPAT")]
66+
return self.send(command: "PUBSUB", with: args)
67+
.tryConverting()
68+
}
69+
70+
/// Resolves a count of (non-pattern) subscribers for each given channel.
71+
///
72+
/// See [PUBSUB NUMSUB](https://redis.io/commands/pubsub#codepubsub-numsub-channel-1--channel-ncode)
73+
/// - Parameter channels: A list of channel names to collect the subscriber counts for.
74+
/// - Returns: A mapping of channel names and their (non-pattern) subscriber count.
75+
public func subscriberCount(forChannels channels: [RedisChannelName]) -> EventLoopFuture<[RedisChannelName: Int]> {
76+
guard channels.count > 0 else { return self.eventLoop.makeSucceededFuture([:]) }
77+
78+
var args: [RESPValue] = [.init(bulk: "NUMSUB")]
79+
args.append(convertingContentsOf: channels)
80+
81+
return self.send(command: "PUBSUB", with: args)
82+
.tryConverting(to: [RESPValue].self)
83+
.flatMapThrowing { response in
84+
assert(response.count == channels.count * 2, "Unexpected response size!")
85+
86+
// Redis guarantees that the response format is [channel1Name, channel1Count, channel2Name, ...]
87+
// with the order of channels matching the order sent in the request
88+
return try channels
89+
.enumerated()
90+
.reduce(into: [:]) { (result, next) in
91+
assert(next.element.rawValue == response[next.offset].string, "Unexpected value in current index!")
92+
93+
guard let count = response[next.offset + 1].int else {
94+
throw RedisClientError.assertionFailure(
95+
message: "Unexpected value at position \(next.offset + 1) in \(response)"
96+
)
97+
}
98+
result[next.element] = count
99+
}
100+
}
101+
}
102+
}

Sources/RediStack/Connection Pool/ConnectionPool.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ internal final class ConnectionPool {
4040
private let connectionFactory: (EventLoop) -> EventLoopFuture<RedisConnection>
4141

4242
/// A stack of connections that are active and suitable for use by clients.
43-
private var availableConnections: ArraySlice<RedisConnection>
43+
private(set) var availableConnections: ArraySlice<RedisConnection>
4444

4545
/// A buffer of users waiting for connections to be handed over.
4646
private var connectionWaiters: CircularBuffer<Waiter>
@@ -66,7 +66,7 @@ internal final class ConnectionPool {
6666
private var pendingConnectionCount: Int
6767

6868
/// The number of connections that have been handed out to users and are in active use.
69-
private var leasedConnectionCount: Int
69+
private(set) var leasedConnectionCount: Int
7070

7171
/// Whether this connection pool is "leaky".
7272
///

Sources/RediStack/Extensions/SwiftNIO.swift

Lines changed: 86 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2019 RediStack project authors
5+
// Copyright (c) 2019-2020 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -14,6 +14,8 @@
1414

1515
import NIO
1616

17+
// MARK: Convenience extensions
18+
1719
extension TimeAmount {
1820
/// The seconds representation of the TimeAmount.
1921
@usableFromInline
@@ -22,13 +24,37 @@ extension TimeAmount {
2224
}
2325
}
2426

25-
// MARK: Setting up a Redis connection
27+
// MARK: Pipeline manipulation
2628

2729
extension Channel {
28-
/// Adds the baseline `ChannelHandlers` needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
30+
/// Adds the baseline channel handlers needed to support sending and receiving messages in Redis Serialization Protocol (RESP) format to the pipeline.
2931
///
3032
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
31-
/// - Returns: An `EventLoopFuture` that resolves after all handlers have been added to the pipeline.
33+
///
34+
/// # Pipeline chart
35+
/// RedisClient.send
36+
/// |
37+
/// v
38+
/// +-------------------------------------------------------------------+
39+
/// | ChannelPipeline | |
40+
/// | TAIL | |
41+
/// | +---------------------------------------------------------+ |
42+
/// | | RedisCommandHandler | |
43+
/// | +---------------------------------------------------------+ |
44+
/// | ^ | |
45+
/// | | v |
46+
/// | +---------------------+ +----------------------+ |
47+
/// | | RedisByteDecoder | | RedisMessageEncoder | |
48+
/// | +---------------------+ +----------------------+ |
49+
/// | | | |
50+
/// | | HEAD | |
51+
/// +-------------------------------------------------------------------+
52+
/// ^ |
53+
/// | v
54+
/// +-----------------+ +------------------+
55+
/// | [ Socket.read ] | | [ Socket.write ] |
56+
/// +-----------------+ +------------------+
57+
/// - Returns: A `NIO.EventLoopFuture` that resolves after all handlers have been added to the pipeline.
3258
public func addBaseRedisHandlers() -> EventLoopFuture<Void> {
3359
let handlers: [(ChannelHandler, name: String)] = [
3460
(MessageToByteHandler(RedisMessageEncoder()), "RediStack.OutgoingHandler"),
@@ -40,14 +66,69 @@ extension Channel {
4066
on: self.eventLoop
4167
)
4268
}
69+
70+
/// Adds the channel handler that is responsible for handling everything related to Redis PubSub.
71+
/// - Important: The connection that manages this channel is responsible for removing the `RedisPubSubHandler`.
72+
///
73+
/// # Discussion
74+
/// PubSub responsibilities include managing subscription callbacks as well as parsing and dispatching messages received from Redis.
75+
///
76+
/// For implementation details, see `RedisPubSubHandler`.
77+
///
78+
/// The handler will be inserted in the `NIO.ChannelPipeline` just before the `RedisCommandHandler` instance.
79+
///
80+
/// # Pipeline chart
81+
/// RedisClient.send
82+
/// |
83+
/// v
84+
/// +-------------------------------------------------------------------+
85+
/// | ChannelPipeline | |
86+
/// | TAIL | |
87+
/// | +---------------------------------------------------------+ |
88+
/// | | RedisCommandHandler | |
89+
/// | +---------------------------------------------------------+ |
90+
/// | ^ | |
91+
/// | | v |
92+
/// | +---------------------------------------------------------+ |
93+
/// | | (might forward) RedisPubSubHandler (forwards) |----|<-----------+
94+
/// | +---------------------------------------------------------+ | |
95+
/// | ^ | | +
96+
/// | | v | RedisClient.subscribe/unsubscribe
97+
/// | +---------------------+ +----------------------+ |
98+
/// | | RedisByteDecoder | | RedisMessageEncoder | |
99+
/// | +---------------------+ +----------------------+ |
100+
/// | | | |
101+
/// | | HEAD | |
102+
/// +-------------------------------------------------------------------+
103+
/// ^ |
104+
/// | v
105+
/// +-----------------+ +------------------+
106+
/// | [ Socket.read ] | | [ Socket.write ] |
107+
/// +-----------------+ +------------------+
108+
/// - Returns: A `NIO.EventLoopFuture` that resolves the instance of the PubSubHandler that was added to the pipeline.
109+
public func addPubSubHandler() -> EventLoopFuture<RedisPubSubHandler> {
110+
return self.pipeline
111+
.handler(type: RedisCommandHandler.self)
112+
.flatMap {
113+
let pubsubHandler = RedisPubSubHandler()
114+
return self.pipeline
115+
.addHandler(pubsubHandler, name: "RediStack.PubSubHandler", position: .before($0))
116+
.map { pubsubHandler }
117+
}
118+
}
43119
}
120+
121+
// MARK: Setting up a Redis connection
122+
44123
extension ClientBootstrap {
45124
/// Makes a new `ClientBootstrap` instance with a baseline Redis `Channel` pipeline
46125
/// for sending and receiving messages in Redis Serialization Protocol (RESP) format.
47126
///
48127
/// For implementation details, see `RedisMessageEncoder`, `RedisByteDecoder`, and `RedisCommandHandler`.
128+
///
129+
/// See also `Channel.addBaseRedisHandlers()`.
49130
/// - Parameter group: The `EventLoopGroup` to create the `ClientBootstrap` with.
50-
/// - Returns: A `ClientBootstrap` with the base configuration of a `Channel` pipeline for RESP messages.
131+
/// - Returns: A TCP connection with the base configuration of a `Channel` pipeline for RESP messages.
51132
public static func makeRedisTCPClient(group: EventLoopGroup) -> ClientBootstrap {
52133
return ClientBootstrap(group: group)
53134
.channelOption(
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the RediStack open source project
4+
//
5+
// Copyright (c) 2020 RediStack project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of RediStack project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// A representation of a Redis Pub/Sub channel.
16+
///
17+
/// `RedisChannelName` is a thin wrapper around `String`, to provide stronger type-safety at compile time.
18+
///
19+
/// It conforms to `ExpressibleByStringLiteral` and `ExpressibleByStringInterpolation`, so creating an instance is simple:
20+
/// ```swift
21+
/// let channel: RedisChannelName = "channel1" // or "\(channelNameVariable)"
22+
/// ```
23+
public struct RedisChannelName:
24+
RESPValueConvertible,
25+
RawRepresentable,
26+
ExpressibleByStringLiteral,
27+
ExpressibleByStringInterpolation,
28+
CustomStringConvertible, CustomDebugStringConvertible,
29+
Comparable, Hashable, Codable
30+
{
31+
public let rawValue: String
32+
33+
/// Initializes a type-safe representation of a Redis Pub/Sub channel name.
34+
/// - Parameter name: The name of the Redis Pub/Sub channel.
35+
public init(_ name: String) {
36+
self.rawValue = name
37+
}
38+
39+
public var description: String { self.rawValue }
40+
public var debugDescription: String { "\(Self.self): \(self.rawValue)" }
41+
42+
public init?(fromRESP value: RESPValue) {
43+
guard let string = value.string else { return nil }
44+
self.rawValue = string
45+
}
46+
public init?(rawValue: String) { self.rawValue = rawValue }
47+
public init(stringLiteral value: String) { self.rawValue = value }
48+
public init(from decoder: Decoder) throws {
49+
let container = try decoder.singleValueContainer()
50+
self.rawValue = try container.decode(String.self)
51+
}
52+
53+
public static func <(lhs: RedisChannelName, rhs: RedisChannelName) -> Bool {
54+
return lhs.rawValue < rhs.rawValue
55+
}
56+
57+
public func convertedToRESPValue() -> RESPValue {
58+
return .init(bulk: self.rawValue)
59+
}
60+
public func encode(to encoder: Encoder) throws {
61+
var container = encoder.singleValueContainer()
62+
try container.encode(self.rawValue)
63+
}
64+
}

0 commit comments

Comments
 (0)