2
2
//
3
3
// This source file is part of the RediStack open source project
4
4
//
5
- // Copyright (c) 2019 RediStack project authors
5
+ // Copyright (c) 2019-2020 RediStack project authors
6
6
// Licensed under Apache License v2.0
7
7
//
8
8
// See LICENSE.txt for license information
@@ -60,13 +60,13 @@ extension RedisConnection {
60
60
to socket: SocketAddress ,
61
61
on eventLoop: EventLoop ,
62
62
password: String ? = nil ,
63
- logger: Logger = . init ( label : " RediStack.RedisConnection " ) ,
63
+ logger: Logger = . redisBaseConnectionLogger ,
64
64
tcpClient: ClientBootstrap ? = nil
65
65
) -> EventLoopFuture < RedisConnection > {
66
66
let client = tcpClient ?? ClientBootstrap . makeRedisTCPClient ( group: eventLoop)
67
67
68
68
return client. connect ( to: socket)
69
- . map { return RedisConnection ( configuredRESPChannel: $0, logger : logger) }
69
+ . map { return RedisConnection ( configuredRESPChannel: $0, context : logger) }
70
70
. flatMap { connection in
71
71
guard let pw = password else {
72
72
return connection. eventLoop. makeSucceededFuture ( connection)
@@ -95,15 +95,10 @@ extension RedisConnection {
95
95
///
96
96
/// Note: `wait()` is used in the example for simplicity. Never call `wait()` on an event loop.
97
97
///
98
- /// See `NIO.SocketAddress`, `NIO.EventLoop`, and `RedisClient`
99
- public final class RedisConnection : RedisClient {
98
+ /// See `NIO.SocketAddress`, `NIO.EventLoop`, and `RedisClient`.
99
+ public final class RedisConnection : RedisClient , RedisClientWithUserContext {
100
100
/// A unique identifer to represent this connection.
101
101
public let id = UUID ( )
102
- public private( set) var logger : Logger {
103
- didSet {
104
- self . logger [ metadataKey: String ( describing: RedisConnection . self) ] = . string( self . id. description)
105
- }
106
- }
107
102
public var eventLoop : EventLoop { return self . channel. eventLoop }
108
103
/// Is the connection to Redis still open?
109
104
public var isConnected : Bool {
@@ -126,6 +121,8 @@ public final class RedisConnection: RedisClient {
126
121
}
127
122
128
123
internal let channel : Channel
124
+ private let systemContext : Context
125
+ private var logger : Logger { self . systemContext }
129
126
130
127
private let autoflush : NIOAtomic < Bool > = . makeAtomic( value: true )
131
128
private let _stateLock = Lock ( )
@@ -138,13 +135,18 @@ public final class RedisConnection: RedisClient {
138
135
deinit {
139
136
if isConnected {
140
137
assertionFailure ( " close() was not called before deinit! " )
141
- logger. warning ( " RedisConnection did not properly shutdown before deinit! " )
138
+ self . logger. warning ( " connection was not properly shutdown before deinit" )
142
139
}
143
140
}
144
141
145
- internal init ( configuredRESPChannel: Channel , logger : Logger ) {
142
+ internal init ( configuredRESPChannel: Channel , context : Context ) {
146
143
self . channel = configuredRESPChannel
147
- self . logger = logger
144
+ // there is a mix of verbiage here as the API is forward thinking towards "baggage context"
145
+ // while right now it's just an alias of a 'Logging.logger'
146
+ // in the future this will probably be a property _on_ the context
147
+ var logger = context
148
+ logger [ metadataKey: RedisLogging . MetadataKeys. connectionID] = " \( self . id. description) "
149
+ self . systemContext = logger
148
150
149
151
RedisMetrics . activeConnectionCount. increment ( )
150
152
RedisMetrics . totalConnectionCount. increment ( )
@@ -157,11 +159,11 @@ public final class RedisConnection: RedisClient {
157
159
guard self . state == . open else { return }
158
160
159
161
self . state = . closed
160
- self . logger. error ( " Channel was closed unexpectedly. " )
162
+ self . logger. error ( " connection was closed unexpectedly" )
161
163
RedisMetrics . activeConnectionCount. decrement ( )
162
164
}
163
165
164
- self . logger. trace ( " Connection created. " )
166
+ self . logger. trace ( " connection created" )
165
167
}
166
168
167
169
internal enum ConnectionState {
@@ -177,18 +179,35 @@ extension RedisConnection {
177
179
/// Sends the command with the provided arguments to Redis.
178
180
///
179
181
/// See `RedisClient.send(command:with:)`.
180
- ///
181
182
/// - Note: The timing of when commands are actually sent to Redis can be controlled with the `RedisConnection.sendCommandsImmediately` property.
182
183
/// - Returns: A `NIO.EventLoopFuture` that resolves with the command's result stored in a `RESPValue`.
183
184
/// If a `RedisError` is returned, the future will be failed instead.
184
185
public func send( command: String , with arguments: [ RESPValue ] ) -> EventLoopFuture < RESPValue > {
185
- self . logger. trace ( " Received command " )
186
+ self . eventLoop. flatSubmit {
187
+ return self . send ( command: command, with: arguments, context: nil )
188
+ }
189
+ }
190
+
191
+ internal func send(
192
+ command: String ,
193
+ with arguments: [ RESPValue ] ,
194
+ context: Context ?
195
+ ) -> EventLoopFuture < RESPValue > {
196
+ self . eventLoop. preconditionInEventLoop ( )
197
+
198
+ let logger = self . prepareLoggerForUse ( context)
186
199
187
200
guard self . isConnected else {
188
201
let error = RedisClientError . connectionClosed
189
202
logger. warning ( " \( error. localizedDescription) " )
190
203
return self . channel. eventLoop. makeFailedFuture ( error)
191
204
}
205
+ logger. trace ( " received command request " )
206
+
207
+ logger. debug ( " sending command " , metadata: [
208
+ RedisLogging . MetadataKeys. commandKeyword: " \( command) " ,
209
+ RedisLogging . MetadataKeys. commandArguments: " \( arguments) "
210
+ ] )
192
211
193
212
var message : [ RESPValue ] = [ . init( bulk: command) ]
194
213
message. append ( contentsOf: arguments)
@@ -204,17 +223,21 @@ extension RedisConnection {
204
223
let duration = DispatchTime . now ( ) . uptimeNanoseconds - startTime
205
224
RedisMetrics . commandRoundTripTime. recordNanoseconds ( duration)
206
225
207
- // log the error here instead
208
- guard case let . failure( error) = result else {
209
- self . logger. trace ( " Command completed. " )
210
- return
226
+ // log data based on the result
227
+ switch result {
228
+ case let . failure( error) :
229
+ logger. error ( " command failed " , metadata: [
230
+ RedisLogging . MetadataKeys. error: " \( error. localizedDescription) "
231
+ ] )
232
+
233
+ case let . success( value) :
234
+ logger. debug ( " command succeeded " , metadata: [
235
+ RedisLogging . MetadataKeys. commandResult: " \( value) "
236
+ ] )
211
237
}
212
- self . logger. error ( " \( error. localizedDescription) " )
213
238
}
214
239
215
- self . logger. debug ( " Sending command \" \( command) \" \( arguments. count > 0 ? " with \( arguments) " : " " ) " )
216
-
217
- defer { self . logger. trace ( " Command sent through channel. " ) }
240
+ defer { logger. trace ( " command sent " ) }
218
241
219
242
if self . sendCommandsImmediately {
220
243
return channel. writeAndFlush ( command) . flatMap { promise. futureResult }
@@ -232,26 +255,34 @@ extension RedisConnection {
232
255
/// See [https://redis.io/commands/quit](https://redis.io/commands/quit)
233
256
/// - Important: Regardless if the returned `NIO.EventLoopFuture` fails or succeeds - after calling this method the connection should no longer be
234
257
/// used for sending commands to Redis.
258
+ /// - Parameter logger: An optional logger instance to use while trying to close the connection.
259
+ /// If one is not provided, the pool will use its default logger.
235
260
/// - Returns: A `NIO.EventLoopFuture` that resolves when the connection has been closed.
236
261
@discardableResult
237
- public func close( ) -> EventLoopFuture < Void > {
238
- self . logger. trace ( " Received request to close the connection. " )
262
+ public func close( logger : Logger ? = nil ) -> EventLoopFuture < Void > {
263
+ let logger = self . prepareLoggerForUse ( logger )
239
264
240
265
guard self . isConnected else {
241
266
// return the channel's close future, which is resolved as the last step in channel shutdown
267
+ logger. info ( " received duplicate request to close connection " )
242
268
return self . channel. closeFuture
243
269
}
270
+ logger. trace ( " received request to close the connection " )
244
271
245
272
// we're now in a shutdown state, starting with the command queue.
246
273
self . state = . shuttingDown
247
274
248
- let notification = self . sendQuitCommand ( ) // send "QUIT" so that all the responses are written out
275
+ let notification = self . sendQuitCommand ( logger : logger ) // send "QUIT" so that all the responses are written out
249
276
. flatMap { self . closeChannel ( ) } // close the channel from our end
250
277
251
- notification. whenFailure { self . logger. error ( " Encountered error during close(): \( $0) " ) }
278
+ notification. whenFailure {
279
+ logger. error ( " error while closing connection " , metadata: [
280
+ RedisLogging . MetadataKeys. error: " \( $0) "
281
+ ] )
282
+ }
252
283
notification. whenSuccess {
253
284
self . state = . closed
254
- self . logger. debug ( " Connection closed. " )
285
+ logger. trace ( " connection is now closed" )
255
286
RedisMetrics . activeConnectionCount. decrement ( )
256
287
}
257
288
@@ -260,19 +291,19 @@ extension RedisConnection {
260
291
261
292
/// Bypasses everything for a normal command and explicitly just sends a "QUIT" command to Redis.
262
293
/// - Note: If the command fails, the `NIO.EventLoopFuture` will still succeed - as it's not critical for the command to succeed.
263
- private func sendQuitCommand( ) -> EventLoopFuture < Void > {
294
+ private func sendQuitCommand( logger : Logger ) -> EventLoopFuture < Void > {
264
295
let promise = channel. eventLoop. makePromise ( of: RESPValue . self)
265
296
let command = RedisCommand (
266
297
message: . array( [ RESPValue ( bulk: " QUIT " ) ] ) ,
267
298
responsePromise: promise
268
299
)
269
300
270
- logger. trace ( " Sending QUIT command. " )
301
+ logger. trace ( " sending QUIT command" )
271
302
272
303
return channel. writeAndFlush ( command) // write the command
273
304
. flatMap { promise. futureResult } // chain the callback to the response's
274
- . map { _ in ( ) } // ignore the result's value
275
- . recover { _ in ( ) } // if there's an error, just return to void
305
+ . map { _ in logger . trace ( " sent QUIT command " ) } // ignore the result's value
306
+ . recover { _ in logger . debug ( " recovered from error sending QUIT " ) } // if there's an error, just return to void
276
307
}
277
308
278
309
/// Attempts to close the `NIO.Channel`.
@@ -303,11 +334,13 @@ extension RedisConnection {
303
334
// MARK: Logging
304
335
305
336
extension RedisConnection {
306
- /// Updates the client's logger, attaching this connection's ID to the metadata.
307
- ///
308
- /// See `RedisClient.setLogging(to:)` and `RedisConnection.id`.
309
- /// - Parameter logger: The `Logging.Logger` that is desired to receive all client logs.
310
- public func setLogging( to logger: Logger ) {
311
- self . logger = logger
337
+ public func logging( to logger: Logger ) -> RedisClient {
338
+ return UserContextRedisClient ( client: self , context: self . prepareLoggerForUse ( logger) )
339
+ }
340
+
341
+ private func prepareLoggerForUse( _ logger: Logger ? ) -> Logger {
342
+ guard var logger = logger else { return self . logger }
343
+ logger [ metadataKey: RedisLogging . MetadataKeys. connectionID] = " \( self . id) "
344
+ return logger
312
345
}
313
346
}
0 commit comments