-
Notifications
You must be signed in to change notification settings - Fork 192
/
Copy pathDispatching.swift
136 lines (115 loc) · 4.62 KB
/
Dispatching.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import Foundation
import Combine
protocol Dispatching {
var onMessage: ((String) -> Void)? { get set }
var isSocketConnected: Bool { get }
var networkConnectionStatusPublisher: AnyPublisher<NetworkConnectionStatus, Never> { get }
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> { get }
func send(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void)
func protectedSend(_ string: String) async throws
func connect() throws
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws
}
final class Dispatcher: NSObject, Dispatching {
var onMessage: ((String) -> Void)?
var socket: WebSocketConnecting
var socketConnectionHandler: SocketConnectionHandler
private let defaultTimeout: Int = 5
private let relayUrlFactory: RelayUrlFactory
private let networkMonitor: NetworkMonitoring
private let logger: ConsoleLogging
private let socketStatusProvider: SocketStatusProviding
var socketConnectionStatusPublisher: AnyPublisher<SocketConnectionStatus, Never> {
socketStatusProvider.socketConnectionStatusPublisher
}
var networkConnectionStatusPublisher: AnyPublisher<NetworkConnectionStatus, Never> {
networkMonitor.networkConnectionStatusPublisher
}
var isSocketConnected: Bool {
return networkMonitor.isConnected
}
private let concurrentQueue = DispatchQueue(label: "com.walletconnect.sdk.dispatcher", qos: .utility, attributes: .concurrent)
init(
socketFactory: WebSocketFactory,
relayUrlFactory: RelayUrlFactory,
networkMonitor: NetworkMonitoring,
socket: WebSocketConnecting,
logger: ConsoleLogging,
socketConnectionHandler: SocketConnectionHandler,
socketStatusProvider: SocketStatusProviding
) {
self.socketConnectionHandler = socketConnectionHandler
self.relayUrlFactory = relayUrlFactory
self.networkMonitor = networkMonitor
self.logger = logger
self.socket = socket
self.socketStatusProvider = socketStatusProvider
super.init()
setUpWebSocketSession()
}
func send(_ string: String, completion: @escaping (Error?) -> Void) {
guard socket.isConnected else {
completion(NetworkError.connectionFailed)
return
}
socket.write(string: string) {
completion(nil)
}
}
func protectedSend(_ string: String, completion: @escaping (Error?) -> Void) {
guard !socket.isConnected || !networkMonitor.isConnected else {
return send(string, completion: completion)
}
// Always connect when there is a message to be sent
if !socket.isConnected {
socketConnectionHandler.handleInternalConnect()
}
var cancellable: AnyCancellable?
cancellable = Publishers.CombineLatest(socketConnectionStatusPublisher, networkConnectionStatusPublisher)
.filter { $0.0 == .connected && $0.1 == .connected }
.setFailureType(to: NetworkError.self)
.timeout(.seconds(defaultTimeout), scheduler: concurrentQueue, customError: { .connectionFailed })
.sink(receiveCompletion: { result in
switch result {
case .failure(let error):
cancellable?.cancel()
completion(error)
case .finished: break
}
}, receiveValue: { [unowned self] _ in
cancellable?.cancel()
send(string, completion: completion)
})
}
func protectedSend(_ string: String) async throws {
var isResumed = false
return try await withCheckedThrowingContinuation { continuation in
protectedSend(string) { error in
if !isResumed {
if let error = error {
continuation.resume(throwing: error)
} else {
continuation.resume(returning: ())
}
isResumed = true
}
}
}
}
func connect() throws {
// Attempt to handle connection
try socketConnectionHandler.handleConnect()
}
func disconnect(closeCode: URLSessionWebSocketTask.CloseCode) throws {
try socketConnectionHandler.handleDisconnect(closeCode: closeCode)
}
}
// MARK: - Private functions
extension Dispatcher {
private func setUpWebSocketSession() {
socket.onText = { [unowned self] in
self.onMessage?($0)
}
}
}