@@ -4,7 +4,7 @@ import ReactiveSwift
4
4
import RSocketCore
5
5
import RSocketNIOChannel
6
6
import RSocketReactiveSwift
7
- import RSocketWebSocketTransport
7
+ import RSocketWSTransport
8
8
9
9
func route( _ route: String ) -> Data {
10
10
let encodedRoute = Data ( route. utf8)
@@ -14,58 +14,56 @@ func route(_ route: String) -> Data {
14
14
return encodedRouteLength + encodedRoute
15
15
}
16
16
17
+ extension URL : ExpressibleByArgument {
18
+ public init ? ( argument: String ) {
19
+ guard let url = URL ( string: argument) else { return nil }
20
+ self = url
21
+ }
22
+ public var defaultValueDescription : String { description }
23
+ }
24
+
17
25
/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter
18
26
struct TwitterClientExample : ParsableCommand {
19
27
static var configuration = CommandConfiguration (
20
28
abstract: " connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events. "
21
29
)
22
30
23
31
@Argument ( help: " used to find tweets that match the given string " )
24
- var searchString = " RSocket "
25
-
26
- @Option
27
- var host = " demo.rsocket.io/rsocket "
32
+ var searchString = " spring "
28
33
29
34
@Option
30
- var port = 80
35
+ var url = URL ( string : " wss://demo.rsocket.io/rsocket " ) !
31
36
32
37
@Option ( help: " maximum number of tweets that are taken before it cancels the stream " )
33
38
var limit = 1000
34
39
35
- mutating func run( ) throws {
40
+ func run( ) throws {
36
41
let bootstrap = ClientBootstrap (
37
- config: ClientSetupConfig (
38
- timeBetweenKeepaliveFrames: 0 ,
39
- maxLifetime: 30_000 ,
40
- metadataEncodingMimeType: " message/x.rsocket.routing.v0 " ,
41
- dataEncodingMimeType: " application/json "
42
- ) ,
43
- transport: WSTransport ( ) ,
44
- timeout: . seconds( 30 )
42
+ config: ClientSetupConfig (
43
+ timeBetweenKeepaliveFrames: 30_000 ,
44
+ maxLifetime: 60_000 ,
45
+ metadataEncodingMimeType: " message/x.rsocket.routing.v0 " ,
46
+ dataEncodingMimeType: " application/json "
47
+ ) ,
48
+ transport: WSTransport ( )
45
49
)
50
+
51
+ let client = try bootstrap. connect ( to: . init( url: url) ) . first ( ) !. get ( )
46
52
47
- let clientProducer = bootstrap. connect ( host: host, port: port)
48
-
49
- let clientProperty = Property < ReactiveSwiftClient ? > ( initial: nil , then: clientProducer. flatMapError { _ in
50
- . empty
51
- } )
52
-
53
- let streamSemaphore = DispatchSemaphore ( value: 0 )
54
- let searchString = self . searchString
55
- clientProperty. producer
56
- . skipNil ( )
57
- . flatMap ( . latest) {
58
- $0. requester. requestStream ( payload: Payload (
59
- metadata: route ( " searchTweets " ) ,
60
- data: Data ( searchString. utf8)
61
- ) )
62
- }
63
- . logEvents ( identifier: " route.searchTweets " )
64
- . take ( first: limit)
65
- . on ( disposed: { streamSemaphore. signal ( ) } )
66
- . start ( )
67
-
68
- streamSemaphore. wait ( ) ;
53
+ try client. requester. requestStream ( payload: Payload (
54
+ metadata: route ( " searchTweets " ) ,
55
+ data: Data ( searchString. utf8)
56
+ ) )
57
+ . attemptMap { payload -> String in
58
+ // pretty print json
59
+ let json = try JSONSerialization . jsonObject ( with: payload. data, options: [ ] )
60
+ let data = try JSONSerialization . data ( withJSONObject: json, options: [ . prettyPrinted] )
61
+ return String ( decoding: data, as: UTF8 . self)
62
+ }
63
+ . logEvents ( identifier: " route.searchTweets " )
64
+ . take ( first: limit)
65
+ . wait ( )
66
+ . get ( )
69
67
}
70
68
}
71
69
0 commit comments