Skip to content

Commit 6b01c58

Browse files
Keep alive connectionstream (#30)
Signed-off-by: ciychodianda <[email protected]> Signed-off-by: David Nadoba <[email protected]> Co-authored-by: David Nadoba <[email protected]>
1 parent 412a78a commit 6b01c58

File tree

10 files changed

+254
-45
lines changed

10 files changed

+254
-45
lines changed

Package.resolved

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Sources/RSocketCore/Channel Handler/ClientSetupConfig.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ extension ClientSetupConfig {
100100
public static var defaultMobileToServer: ClientSetupConfig {
101101
ClientSetupConfig(
102102
timeBetweenKeepaliveFrames: 30_000,
103-
maxLifetime: 30_000,
103+
maxLifetime: 60_000,
104104
metadataEncodingMimeType: "application/octet-stream",
105105
dataEncodingMimeType: "application/octet-stream"
106106
)
@@ -109,7 +109,7 @@ extension ClientSetupConfig {
109109
public static var defaultServerToServer: ClientSetupConfig {
110110
ClientSetupConfig(
111111
timeBetweenKeepaliveFrames: 500,
112-
maxLifetime: 500,
112+
maxLifetime: 2_000,
113113
metadataEncodingMimeType: "application/octet-stream",
114114
dataEncodingMimeType: "application/octet-stream"
115115
)

Sources/RSocketCore/Channel Handler/ConnectionEstablishment.swift

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
import NIO
1817
import Foundation
18+
import NIO
1919

2020
/// Information about a client which is about to connect or is connected.
2121
public struct SetupInfo {
2222
/// If the connection should honor `LEASE`
2323
public let honorsLease: Bool
24-
24+
2525
/// version of the client protocol implementation
2626
public let version: Version
2727
/**
@@ -72,11 +72,11 @@ public struct SetupInfo {
7272

7373
internal struct SetupValidator {
7474
internal var maximumClientVersion = Version.v1_0
75-
75+
7676
internal func validate(frame: Frame) throws -> SetupInfo {
7777
try validateSetup(try getSetupBody(frame))
7878
}
79-
79+
8080
private func getSetupBody(_ frame: Frame) throws -> SetupFrameBody {
8181
guard frame.header.streamId == .connection else {
8282
throw Error.invalidSetup(message: "connection needs to be setup on stream 0")
@@ -89,7 +89,7 @@ internal struct SetupValidator {
8989
}
9090
return setup
9191
}
92-
92+
9393
private func validateSetup(_ setup: SetupFrameBody) throws -> SetupInfo {
9494
guard setup.version <= maximumClientVersion else {
9595
throw Error.unsupportedSetup(message: "only version \(maximumClientVersion) and lower are supported")
@@ -184,7 +184,6 @@ internal final class ConnectionEstablishmentHandler: ChannelInboundHandler, Remo
184184

185185
private let setupValidator = SetupValidator()
186186

187-
188187
/// Configure `ConnectionEstablishmentHandler`. If `shouldAcceptClient` is nil, valid clients are always accepted.
189188
/// - Parameters:
190189
/// - initializeConnection: called after successful handshake and after `shouldAcceptClient`did accept the client.

Sources/RSocketCore/Channel Handler/ConnectionStreamHandler.swift

Lines changed: 0 additions & 23 deletions
This file was deleted.
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright 2015-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import Foundation
18+
import NIO
19+
20+
final class KeepaliveHandler {
21+
private var keepAliveHandle: RepeatedTask?
22+
23+
/// receive time in **seconds** of the last keepalive frame
24+
private var lastReceivedTime: TimeInterval
25+
/// time is in **milliseconds**
26+
private let timeBetweenKeepaliveFrames: Int32
27+
/// time is in **milliseconds**
28+
private let maxLifetime: Int32
29+
private let connectionSide: ConnectionRole
30+
/// returns the current time in **seconds**
31+
private let now: () -> TimeInterval
32+
33+
init(
34+
timeBetweenKeepaliveFrames: Int32,
35+
maxLifetime: Int32,
36+
connectionSide: ConnectionRole,
37+
now: @escaping () -> TimeInterval = { ProcessInfo.processInfo.systemUptime }
38+
) {
39+
self.timeBetweenKeepaliveFrames = timeBetweenKeepaliveFrames
40+
self.maxLifetime = maxLifetime
41+
self.connectionSide = connectionSide
42+
self.now = now
43+
self.lastReceivedTime = now()
44+
}
45+
}
46+
47+
extension KeepaliveHandler: ChannelInboundHandler {
48+
typealias InboundIn = Frame
49+
typealias OutboundOut = Frame
50+
51+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
52+
let frame = unwrapInboundIn(data)
53+
switch frame.body {
54+
case let .keepalive(body):
55+
lastReceivedTime = now()
56+
if body.respondWithKeepalive {
57+
let keepAliveFrame = KeepAliveFrameBody(respondWithKeepalive: false, lastReceivedPosition: 0, data: Data()).asFrame()
58+
context.writeAndFlush(self.wrapOutboundOut(keepAliveFrame), promise: nil)
59+
}
60+
default:
61+
break
62+
}
63+
}
64+
65+
func handlerAdded(context: ChannelHandlerContext) {
66+
if context.channel.isActive {
67+
/// this handler may get added to the pipeline after the channel is already active and `channelActive` is then not called
68+
onActive(context: context)
69+
}
70+
}
71+
72+
func channelActive(context: ChannelHandlerContext) {
73+
onActive(context: context)
74+
}
75+
76+
func onActive(context: ChannelHandlerContext) {
77+
guard timeBetweenKeepaliveFrames > 0 else { return }
78+
lastReceivedTime = now()
79+
guard connectionSide == .client else { return }
80+
81+
keepAliveHandle = context.eventLoop.scheduleRepeatedAsyncTask(
82+
initialDelay: .milliseconds(Int64(timeBetweenKeepaliveFrames)),
83+
delay: .milliseconds(Int64(timeBetweenKeepaliveFrames))
84+
) { [self] task in
85+
let elapsedTimeSinceLastKeepaliveInSeconds = now() - lastReceivedTime
86+
let elapsedTimeSinceLastKeepaliveInMilliseconds = Int32((elapsedTimeSinceLastKeepaliveInSeconds * 1000).rounded(.up))
87+
if elapsedTimeSinceLastKeepaliveInMilliseconds >= maxLifetime {
88+
let errorFrame = Error.connectionClose(message: "KeepAlive timeout exceeded").asFrame(withStreamId: .connection)
89+
context.writeAndFlush(self.wrapOutboundOut(errorFrame), promise: nil)
90+
task.cancel()
91+
return context.eventLoop.makeFailedFuture(Error.applicationError(message: "KeepAliveHandler Shutdown"))
92+
} else {
93+
let keepAliveFrame = KeepAliveFrameBody(
94+
respondWithKeepalive: true,
95+
/// we do not support resumability yet, thus do not keep track of `lastReceivedPosition` and just always send 0
96+
lastReceivedPosition: 0,
97+
data: Data()
98+
).asFrame()
99+
return context.writeAndFlush(self.wrapOutboundOut(keepAliveFrame))
100+
}
101+
}
102+
}
103+
104+
func channelInactive(context: ChannelHandlerContext) {
105+
keepAliveHandle?.cancel()
106+
}
107+
}

Sources/RSocketCore/Channel Handler/SetupWriter.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ internal final class SetupWriter: ChannelInboundHandler, RemovableChannelHandler
2222
typealias OutboundOut = Frame
2323
private let setup: ClientSetupConfig
2424
private let connectedPromise: EventLoopPromise<Void>?
25-
25+
2626
internal init(config: ClientSetupConfig, connectedPromise: EventLoopPromise<Void>? = nil) {
2727
self.setup = config
2828
self.connectedPromise = connectedPromise
2929
}
30-
30+
3131
func channelActive(context: ChannelHandlerContext) {
3232
context.writeAndFlush(self.wrapOutboundOut(SetupFrameBody(
3333
honorsLease: false,
@@ -39,15 +39,15 @@ internal final class SetupWriter: ChannelInboundHandler, RemovableChannelHandler
3939
dataEncodingMimeType: setup.dataEncodingMimeType,
4040
payload: setup.payload
4141
).asFrame()), promise: nil)
42+
context.fireChannelActive()
4243
context.channel.pipeline.removeHandler(context: context).eventLoop.assertInEventLoop()
4344
connectedPromise?.succeed(())
4445
}
45-
46+
4647
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
4748
assertionFailure("should never receive data because we remove this handler right after the channel becomes active")
4849
/// We need to conform to `ChannelInboundHandler` to get called when the channel becomes active and we remove ourself immediately after the channel becomes active
4950
/// If, for whatever reason, this method gets called, we just forward the data in release mode
5051
context.fireChannelRead(data)
5152
}
5253
}
53-

Sources/RSocketCore/ChannelPipeline.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
import Foundation
1718
import NIO
1819

1920
extension ChannelPipeline {
@@ -32,6 +33,7 @@ extension ChannelPipeline {
3233
responderLateFrameHandler: nil
3334
)
3435
}
36+
3537
internal func addRSocketClientHandlers(
3638
config: ClientSetupConfig,
3739
responder: RSocket? = nil,
@@ -57,7 +59,7 @@ extension ChannelPipeline {
5759
requester: requester,
5860
responder: Responder(responderSocket: responder, eventLoop: eventLoop, sendFrame: sendFrame)
5961
),
60-
ConnectionStreamHandler(),
62+
KeepaliveHandler(timeBetweenKeepaliveFrames: config.timeBetweenKeepaliveFrames, maxLifetime: config.maxLifetime, connectionSide: ConnectionRole.client),
6163
])
6264
}
6365
}
@@ -99,7 +101,7 @@ extension ChannelPipeline {
99101
requester: Requester(streamIdGenerator: .server, eventLoop: eventLoop, sendFrame: sendFrame),
100102
responder: Responder(responderSocket: responder, eventLoop: eventLoop, sendFrame: sendFrame)
101103
),
102-
ConnectionStreamHandler(),
104+
KeepaliveHandler(timeBetweenKeepaliveFrames: info.timeBetweenKeepaliveFrames, maxLifetime: info.maxLifetime, connectionSide: ConnectionRole.server),
103105
])
104106
}, shouldAcceptClient: shouldAcceptClient)
105107
])

Tests/RSocketCorePerformanceTests/EndToEndTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ extension ServerBootstrap: NIOServerTCPBootstrapProtocol{}
3333

3434
class EndToEndTests: XCTestCase {
3535
static let defaultClientSetup = ClientSetupConfig(
36-
timeBetweenKeepaliveFrames: 500,
37-
maxLifetime: 5000,
36+
timeBetweenKeepaliveFrames: 100,
37+
maxLifetime: 1000,
3838
metadataEncodingMimeType: "utf8",
3939
dataEncodingMimeType: "utf8"
4040
)

Tests/RSocketCoreTests/ConnectionEstablishmentTests.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ final class ConnectionEstablishmentTests: XCTestCase {
6060

6161
self.wait(for: [initializeConnection, shouldAcceptSetup], timeout: 0.1)
6262
}
63-
64-
63+
6564
func testDeliveryOfExtraMessagesDuringSetup() throws {
6665
let loop = EmbeddedEventLoop()
6766
let connectionInitialization = loop.makePromise(of: Void.self)

0 commit comments

Comments
 (0)