Skip to content

Commit 7082aa2

Browse files
ayush-yadav001Niclas Kristek
and
Niclas Kristek
authored
Adds observer api for closing connection event (#68)
Co-authored-by: Niclas Kristek <[email protected]>
1 parent b74732d commit 7082aa2

File tree

2 files changed

+56
-0
lines changed

2 files changed

+56
-0
lines changed

Sources/RSocketReactiveSwift/Client/ReactiveSwiftClient.swift

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,20 @@ public struct ReactiveSwiftClient: Client {
4646
internal var isDisposed: Bool {
4747
return !coreClient.channel.isActive
4848
}
49+
/// This methods helps to get call back whenever connection is closed
50+
/// - Returns: SignalProducer<Void, Swift.Error> to represent task result
51+
public var shutdownProducer: SignalProducer<Void, Swift.Error> {
52+
SignalProducer { observer, _ in
53+
coreClient.channel.closeFuture.whenComplete { result in
54+
switch result {
55+
case .success:
56+
observer.sendCompleted()
57+
case .failure(let error):
58+
observer.send(error: error)
59+
}
60+
}
61+
}
62+
}
4963
}
5064

5165
extension ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket {

Tests/RSocketReactiveSwiftTests/RSocketReactiveSwiftTests.swift

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,4 +404,46 @@ final class RSocketReactiveSwiftTests: XCTestCase {
404404
// checking if connection is inactive
405405
XCTAssertTrue(client.isDisposed)
406406
}
407+
func testConnectionDisposeListener() {
408+
// Creating expectation
409+
let didReceiveConnectionclosedEvent = expectation(description: "did receive Connection closed event ")
410+
let serverResponder: RSocketReactiveSwift.ResponderRSocket? = TestRSocket()
411+
let clientResponder: RSocketReactiveSwift.ResponderRSocket? = TestRSocket()
412+
let (serverMultiplexer, clientMultiplexer) = TestDemultiplexer.pipe(
413+
serverResponder: serverResponder.map { ResponderAdapter(responder: $0, encoding: .default) },
414+
clientResponder: clientResponder.map { ResponderAdapter(responder: $0, encoding: .default) }
415+
)
416+
// Channel creation
417+
let serverChannel = EmbeddedChannel()
418+
// Making channel Active
419+
XCTAssertNoThrow(try serverChannel.connect(to: SocketAddress.init(ipAddress: "127.0.0.1", port: 0)).wait())
420+
let clientChannel = EmbeddedChannel()
421+
XCTAssertNoThrow(try clientChannel.connect(to: SocketAddress.init(ipAddress: "127.0.0.1", port: 0)).wait())
422+
// Creating Reactive swift client
423+
let server = ReactiveSwiftClient(CoreClient(requester: serverMultiplexer.requester, channel: serverChannel))
424+
let client = ReactiveSwiftClient(CoreClient(requester: clientMultiplexer.requester, channel: clientChannel))
425+
defer {
426+
// closing channel connection
427+
XCTAssertNoThrow(try serverChannel.finish())
428+
}
429+
XCTAssertNotNil(server)
430+
XCTAssertNotNil(client)
431+
// client connection closed event signal producer
432+
client.shutdownProducer.startWithSignal({ signal, interruptHandle in
433+
signal.flatMapError({ error -> Signal<Void, Error> in
434+
XCTFail("\(error)")
435+
return .empty
436+
}).materialize().collect().observeValues { values in
437+
didReceiveConnectionclosedEvent.fulfill()
438+
}
439+
})
440+
// checking if connection is active
441+
XCTAssertFalse(client.isDisposed)
442+
// closing connection using dispose method
443+
XCTAssertNoThrow(try clientChannel.finish())
444+
// checking if connection is inactive
445+
XCTAssertTrue(client.isDisposed)
446+
self.wait(for: [didReceiveConnectionclosedEvent], timeout: 0.1)
447+
}
407448
}
449+

0 commit comments

Comments
 (0)