Skip to content

Commit 7300b4d

Browse files
committed
quick & dirty implementation of async/await and AsyncSequence
Signed-off-by: David Nadoba <[email protected]>
1 parent b343fd0 commit 7300b4d

File tree

4 files changed

+285
-4
lines changed

4 files changed

+285
-4
lines changed

Package.resolved

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Package.swift

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ let package = Package(
2424
.executable(name: "timer-client-example", targets: ["TimerClientExample"]),
2525
.executable(name: "twitter-client-example", targets: ["TwitterClientExample"]),
2626
.executable(name: "vanilla-client-example", targets: ["VanillaClientExample"]),
27+
28+
.executable(name: "async-twitter-client-example", targets: ["AsyncTwitterClientExample"]),
2729
],
2830
dependencies: [
2931
.package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"),
30-
.package(url: "https://github.com/apple/swift-nio", from: "2.26.0"),
32+
.package(url: "https://github.com/apple/swift-nio", .branch("main")),
3133
.package(url: "https://github.com/apple/swift-nio-extras", from: "1.8.0"),
3234
.package(url: "https://github.com/apple/swift-nio-transport-services", from: "1.9.2"),
3335
.package(url: "https://github.com/apple/swift-nio-ssl", from: "2.10.4"),
@@ -46,6 +48,11 @@ let package = Package(
4648
"RSocketCore",
4749
.product(name: "ReactiveSwift", package: "ReactiveSwift")
4850
]),
51+
.target(name: "RSocketAsync", dependencies: [
52+
"RSocketCore",
53+
.product(name: "NIO", package: "swift-nio"),
54+
.product(name: "_NIOConcurrency", package: "swift-nio"),
55+
]),
4956

5057
// Channel
5158
.target(name: "RSocketTSChannel", dependencies: [
@@ -132,6 +139,26 @@ let package = Package(
132139
],
133140
path: "Sources/Examples/VanillaClient"
134141
),
142+
143+
.target(
144+
name: "AsyncTwitterClientExample",
145+
dependencies: [
146+
"RSocketCore",
147+
"RSocketNIOChannel",
148+
"RSocketWebSocketTransport",
149+
"RSocketAsync",
150+
.product(name: "ArgumentParser", package: "swift-argument-parser"),
151+
.product(name: "NIO", package: "swift-nio"),
152+
.product(name: "_NIOConcurrency", package: "swift-nio"),
153+
],
154+
path: "Sources/Examples/AsyncTwitterClient",
155+
swiftSettings: [
156+
.unsafeFlags([
157+
"-Xfrontend",
158+
"-enable-experimental-concurrency"
159+
])
160+
]
161+
),
135162
],
136163
swiftLanguageVersions: [.v5]
137164
)
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import ArgumentParser
3+
import Foundation
4+
import RSocketCore
5+
import RSocketAsync
6+
import NIO
7+
import RSocketNIOChannel
8+
import RSocketReactiveSwift
9+
import RSocketWebSocketTransport
10+
11+
func route(_ route: String) -> Data {
12+
let encodedRoute = Data(route.utf8)
13+
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
14+
let encodedRouteLength = Data([UInt8(encodedRoute.count)])
15+
16+
return encodedRouteLength + encodedRoute
17+
}
18+
19+
/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter
20+
struct TwitterClientExample: ParsableCommand {
21+
static var configuration = CommandConfiguration(
22+
abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events."
23+
)
24+
25+
@Argument(help: "used to find tweets that match the given string")
26+
var searchString = "spring"
27+
28+
@Option
29+
var host = "demo.rsocket.io"
30+
31+
@Option
32+
var port = 80
33+
34+
@Option
35+
var uri = "/rsocket"
36+
37+
@Option(help: "maximum number of tweets that are taken before it cancels the stream")
38+
var limit = 1000
39+
40+
func run() throws {
41+
let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1)
42+
defer { try! eventLoop.syncShutdownGracefully() }
43+
let promise = eventLoop.next().makePromise(of: Void.self)
44+
promise.completeWithAsync {
45+
try await self.runAsync()
46+
}
47+
try promise.futureResult.wait()
48+
}
49+
func runAsync() async throws {
50+
let bootstrap = ClientBootstrap(
51+
config: ClientSetupConfig(
52+
timeBetweenKeepaliveFrames: 0,
53+
maxLifetime: 30_000,
54+
metadataEncodingMimeType: "message/x.rsocket.routing.v0",
55+
dataEncodingMimeType: "application/json"
56+
),
57+
transport: WSTransport(),
58+
timeout: .seconds(30)
59+
)
60+
let client = try await bootstrap.connect(host: host, port: port, uri: uri)
61+
62+
let stream = client.requester.requestStream(payload: Payload(
63+
metadata: route("searchTweets"),
64+
data: Data(searchString.utf8)
65+
))
66+
67+
for try await payload in stream.prefix(limit) {
68+
let json = try JSONSerialization.jsonObject(with: payload.data, options: [])
69+
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
70+
let string = String(decoding: data, as: UTF8.self)
71+
print(string)
72+
}
73+
}
74+
}
75+
76+
TwitterClientExample.main()
77+
#endif

Sources/RSocketAsync/AsyncAwait.swift

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
#if compiler(>=5.4) && $AsyncAwait
2+
import RSocketCore
3+
import _Concurrency
4+
import NIO
5+
import _NIOConcurrency
6+
7+
public protocol RSocket {
8+
func requestResponse(payload: Payload) async throws -> Payload
9+
func requestStream(payload: Payload) -> AsyncStreamSequence
10+
}
11+
12+
public struct RequesterAdapter: RSocket {
13+
private let requester: RSocketCore.RSocket
14+
private let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1)
15+
public init(requester: RSocketCore.RSocket) {
16+
self.requester = requester
17+
}
18+
public func requestResponse(payload: Payload) async throws -> Payload {
19+
struct RequestResponseOperator: UnidirectionalStream {
20+
var promise: EventLoopPromise<Payload>
21+
func onNext(_ payload: Payload, isCompletion: Bool) {
22+
assert(isCompletion)
23+
promise.succeed(payload)
24+
}
25+
26+
func onComplete() {
27+
assertionFailure("request response does not support \(#function)")
28+
}
29+
30+
func onRequestN(_ requestN: Int32) {
31+
assertionFailure("request response does not support \(#function)")
32+
}
33+
34+
func onCancel() {
35+
promise.fail(Error.canceled(message: "onCancel"))
36+
}
37+
38+
func onError(_ error: Error) {
39+
promise.fail(error)
40+
}
41+
42+
func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
43+
assertionFailure("request response does not support \(#function)")
44+
}
45+
}
46+
let promise = eventLoop.next().makePromise(of: Payload.self)
47+
let stream = RequestResponseOperator(promise: promise)
48+
_ = requester.requestResponse(payload: payload, responderStream: stream)
49+
return try await promise.futureResult.get()
50+
}
51+
52+
public func requestStream(payload: Payload) -> AsyncStreamSequence {
53+
AsyncStreamSequence(payload: payload, requester: requester, eventLoop: eventLoop.next())
54+
}
55+
}
56+
57+
public struct AsyncStreamSequence: AsyncSequence {
58+
public typealias AsyncIterator = AsyncStreamIterator
59+
60+
public typealias Element = Payload
61+
62+
fileprivate init(payload: Payload, requester: RSocketCore.RSocket, eventLoop: EventLoop) {
63+
self.payload = payload
64+
self.requester = requester
65+
self.eventLoop = eventLoop
66+
}
67+
private var payload: Payload
68+
private var requester: RSocketCore.RSocket
69+
private var eventLoop: EventLoop
70+
public func makeAsyncIterator() -> AsyncStreamIterator {
71+
let stream = AsyncStreamIterator(eventLoop: eventLoop)
72+
stream.subscription = requester.stream(payload: payload, initialRequestN: 0, responderStream: stream)
73+
return stream
74+
}
75+
}
76+
77+
public final class AsyncStreamIterator: AsyncIteratorProtocol, UnidirectionalStream {
78+
fileprivate init(
79+
eventLoop: EventLoop
80+
) {
81+
self.eventLoop = eventLoop
82+
}
83+
84+
private enum Event {
85+
case next(Payload, isCompletion: Bool)
86+
case error(Error)
87+
case complete
88+
case cancel
89+
}
90+
private var eventLoop: EventLoop
91+
private var event: EventLoopPromise<Event>? = nil
92+
private var isCompleted: Bool = false
93+
fileprivate var subscription: Subscription! = nil
94+
public func onNext(_ payload: Payload, isCompletion: Bool) {
95+
eventLoop.execute { [self] in
96+
assert(event != nil)
97+
event?.succeed(.next(payload, isCompletion: isCompletion))
98+
}
99+
100+
}
101+
102+
public func onComplete() {
103+
eventLoop.execute { [self] in
104+
assert(event != nil)
105+
event?.succeed(.complete)
106+
}
107+
}
108+
109+
public func onRequestN(_ requestN: Int32) {
110+
assertionFailure("request response does not support \(#function)")
111+
}
112+
113+
public func onCancel() {
114+
eventLoop.execute { [self] in
115+
assert(event != nil)
116+
event?.succeed(.cancel)
117+
}
118+
}
119+
120+
public func onError(_ error: Error) {
121+
eventLoop.execute { [self] in
122+
assert(event != nil)
123+
event?.succeed(.error(error))
124+
}
125+
}
126+
127+
public func onExtension(extendedType: Int32, payload: Payload, canBeIgnored: Bool) {
128+
assertionFailure("request response does not support \(#function)")
129+
}
130+
public func next() async throws -> Payload? {
131+
let p = eventLoop.makePromise(of: Optional<Payload>.self)
132+
p.completeWithAsync { [self] in
133+
guard !isCompleted else { return nil }
134+
assert(event == nil)
135+
let promise = eventLoop.makePromise(of: Event.self)
136+
event = promise
137+
subscription.onRequestN(1)
138+
let result = try await promise.futureResult.get()
139+
event = nil
140+
switch result {
141+
case let .next(payload, isCompletion):
142+
self.isCompleted = isCompletion
143+
return payload
144+
case .complete:
145+
self.isCompleted = true
146+
return nil
147+
case .cancel:
148+
self.isCompleted = true
149+
return nil
150+
case let .error(error):
151+
self.isCompleted = true
152+
throw error
153+
}
154+
}
155+
return try await p.futureResult.get()
156+
}
157+
deinit {
158+
subscription.onCancel()
159+
}
160+
}
161+
162+
public struct AsyncClient {
163+
private let coreClient: RSocketCore.CoreClient
164+
165+
public var requester: RSocket { RequesterAdapter(requester: coreClient.requester) }
166+
167+
public init(_ coreClient: RSocketCore.CoreClient) {
168+
self.coreClient = coreClient
169+
}
170+
}
171+
172+
extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket {
173+
public func connect(host: String, port: Int, uri: String) async throws -> AsyncClient {
174+
AsyncClient(try await connect(host: host, port: port, uri: uri, responder: nil).get())
175+
}
176+
}
177+
#endif

0 commit comments

Comments
 (0)