Skip to content

Commit 1176dea

Browse files
authored
feat(realtime): add system event (#589)
* feat(realtime): add callback for handling system events * trigger event * fix test
1 parent d2f1fa0 commit 1176dea

File tree

7 files changed

+183
-54
lines changed

7 files changed

+183
-54
lines changed

Sources/Realtime/RealtimeChannel+AsyncAwait.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,21 @@ extension RealtimeChannelV2 {
104104

105105
return stream
106106
}
107+
108+
/// Listen for `system` event.
109+
public func system() -> AsyncStream<RealtimeMessageV2> {
110+
let (stream, continuation) = AsyncStream<RealtimeMessageV2>.makeStream()
111+
112+
let subscription = onSystem {
113+
continuation.yield($0)
114+
}
115+
116+
continuation.onTermination = { _ in
117+
subscription.cancel()
118+
}
119+
120+
return stream
121+
}
107122

108123
/// Listen for broadcast messages sent by other clients within the same channel under a specific `event`.
109124
@available(*, deprecated, renamed: "broadcastStream(event:)")

Sources/Realtime/V2/CallbackManager.swift

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ final class CallbackManager: Sendable {
7575
}
7676
}
7777

78+
@discardableResult
79+
func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int {
80+
mutableState.withValue {
81+
$0.id += 1
82+
$0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback)))
83+
return $0.id
84+
}
85+
}
86+
7887
func setServerChanges(changes: [PostgresJoinConfig]) {
7988
mutableState.withValue {
8089
$0.serverChanges = changes
@@ -145,6 +154,19 @@ final class CallbackManager: Sendable {
145154
}
146155
}
147156

157+
func triggerSystem(message: RealtimeMessageV2) {
158+
let systemCallbacks = mutableState.callbacks.compactMap {
159+
if case .system(let callback) = $0 {
160+
return callback
161+
}
162+
return nil
163+
}
164+
165+
for systemCallback in systemCallbacks {
166+
systemCallback.callback(message)
167+
}
168+
}
169+
148170
func reset() {
149171
mutableState.setValue(MutableState())
150172
}
@@ -167,16 +189,23 @@ struct PresenceCallback {
167189
var callback: @Sendable (any PresenceAction) -> Void
168190
}
169191

192+
struct SystemCallback {
193+
var id: Int
194+
var callback: @Sendable (RealtimeMessageV2) -> Void
195+
}
196+
170197
enum RealtimeCallback {
171198
case postgres(PostgresCallback)
172199
case broadcast(BroadcastCallback)
173200
case presence(PresenceCallback)
201+
case system(SystemCallback)
174202

175203
var id: Int {
176204
switch self {
177205
case let .postgres(callback): callback.id
178206
case let .broadcast(callback): callback.id
179207
case let .presence(callback): callback.id
208+
case let .system(callback): callback.id
180209
}
181210
}
182211
}

Sources/Realtime/V2/RealtimeChannelV2.swift

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import ConcurrencyExtras
99
import Foundation
10-
import Helpers
1110
import HTTPTypes
11+
import Helpers
1212

1313
#if canImport(FoundationNetworking)
1414
import FoundationNetworking
@@ -59,7 +59,9 @@ extension Socket {
5959
addChannel: { [weak client] in client?.addChannel($0) },
6060
removeChannel: { [weak client] in await client?.removeChannel($0) },
6161
push: { [weak client] in await client?.push($0) },
62-
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
62+
httpSend: { [weak client] in
63+
try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse())
64+
}
6365
)
6466
}
6567
}
@@ -185,7 +187,8 @@ public final class RealtimeChannelV2: Sendable {
185187
@available(
186188
*,
187189
deprecated,
188-
message: "manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
190+
message:
191+
"manually updating auth token per channel is not recommended, please use `setAuth` in RealtimeClient instead."
189192
)
190193
public func updateAuth(jwt: String?) async {
191194
logger?.debug("Updating auth token for channel \(topic)")
@@ -238,8 +241,8 @@ public final class RealtimeChannelV2: Sendable {
238241
event: event,
239242
payload: message,
240243
private: config.isPrivate
241-
),
242-
],
244+
)
245+
]
243246
]
244247
)
245248
)
@@ -295,20 +298,27 @@ public final class RealtimeChannelV2: Sendable {
295298

296299
func onMessage(_ message: RealtimeMessageV2) async {
297300
do {
298-
guard let eventType = message.eventType else {
301+
guard let eventType = message._eventType else {
299302
logger?.debug("Received message without event type: \(message)")
300303
return
301304
}
302305

303306
switch eventType {
304307
case .tokenExpired:
305-
logger?.debug(
306-
"Received token expired event. This should not happen, please report this warning."
307-
)
308+
// deprecated type
309+
break
308310

309311
case .system:
310-
logger?.debug("Subscribed to channel \(message.topic)")
311-
status = .subscribed
312+
if message.status == .ok {
313+
logger?.debug("Subscribed to channel \(message.topic)")
314+
status = .subscribed
315+
} else {
316+
logger?.debug(
317+
"Failed to subscribe to channel \(message.topic): \(message.payload)"
318+
)
319+
}
320+
321+
callbackManager.triggerSystem(message: message)
312322

313323
case .reply:
314324
guard
@@ -545,6 +555,24 @@ public final class RealtimeChannelV2: Sendable {
545555
}
546556
}
547557

558+
/// Listen for `system` event.
559+
public func onSystem(
560+
callback: @escaping @Sendable (RealtimeMessageV2) -> Void
561+
) -> RealtimeSubscription {
562+
let id = callbackManager.addSystemCallback(callback: callback)
563+
return RealtimeSubscription { [weak callbackManager, logger] in
564+
logger?.debug("Removing system callback with id: \(id)")
565+
callbackManager?.removeCallback(id: id)
566+
}
567+
}
568+
569+
/// Listen for `system` event.
570+
public func onSystem(
571+
callback: @escaping @Sendable () -> Void
572+
) -> RealtimeSubscription {
573+
self.onSystem { _ in callback() }
574+
}
575+
548576
@discardableResult
549577
func push(_ event: String, ref: String? = nil, payload: JSONObject = [:]) async -> PushStatus {
550578
let push = mutableState.withValue {

Sources/Realtime/V2/RealtimeMessageV2.swift

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,22 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
2323
self.payload = payload
2424
}
2525

26-
var status: PushStatus? {
26+
/// Status for the received message if any.
27+
public var status: PushStatus? {
2728
payload["status"]
2829
.flatMap(\.stringValue)
2930
.flatMap(PushStatus.init(rawValue:))
3031
}
3132

32-
public var eventType: EventType? {
33+
@available(
34+
*, deprecated,
35+
message: "Access to event type will be removed, please inspect raw event value instead."
36+
)
37+
public var eventType: EventType? { _eventType }
38+
39+
var _eventType: EventType? {
3340
switch event {
34-
case ChannelEvent.system where status == .ok: .system
41+
case ChannelEvent.system: .system
3542
case ChannelEvent.postgresChanges:
3643
.postgresChanges
3744
case ChannelEvent.broadcast:
@@ -44,9 +51,6 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
4451
.presenceDiff
4552
case ChannelEvent.presenceState:
4653
.presenceState
47-
case ChannelEvent.system
48-
where payload["message"]?.stringValue?.contains("access token has expired") == true:
49-
.tokenExpired
5054
case ChannelEvent.reply:
5155
.reply
5256
default:
@@ -62,6 +66,11 @@ public struct RealtimeMessageV2: Hashable, Codable, Sendable {
6266
case error
6367
case presenceDiff
6468
case presenceState
69+
@available(
70+
*, deprecated,
71+
message:
72+
"tokenExpired gets returned as system, check payload for verifying if is a token expiration."
73+
)
6574
case tokenExpired
6675
case reply
6776
}

Tests/RealtimeTests/CallbackManagerTests.swift

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
import ConcurrencyExtras
99
import CustomDump
1010
import Helpers
11-
@testable import Realtime
1211
import XCTest
1312

13+
@testable import Realtime
14+
1415
final class CallbackManagerTests: XCTestCase {
1516
func testIntegration() {
1617
let callbackManager = CallbackManager()
@@ -52,13 +53,15 @@ final class CallbackManagerTests: XCTestCase {
5253
let callbackManager = CallbackManager()
5354
XCTAssertNoLeak(callbackManager)
5455

55-
let changes = [PostgresJoinConfig(
56-
event: .update,
57-
schema: "public",
58-
table: "users",
59-
filter: nil,
60-
id: 1
61-
)]
56+
let changes = [
57+
PostgresJoinConfig(
58+
event: .update,
59+
schema: "public",
60+
table: "users",
61+
filter: nil,
62+
id: 1
63+
)
64+
]
6265

6366
callbackManager.setServerChanges(changes: changes)
6467

@@ -118,7 +121,8 @@ final class CallbackManagerTests: XCTestCase {
118121
receivedActions.withValue { $0.append(action) }
119122
}
120123

121-
let deleteSpecificUserId = callbackManager
124+
let deleteSpecificUserId =
125+
callbackManager
122126
.addPostgresCallback(filter: deleteSpecificUserFilter) { action in
123127
receivedActions.withValue { $0.append(action) }
124128
}
@@ -215,6 +219,22 @@ final class CallbackManagerTests: XCTestCase {
215219
expectNoDifference(receivedAction.value?.joins, joins)
216220
expectNoDifference(receivedAction.value?.leaves, leaves)
217221
}
222+
223+
func testTriggerSystem() {
224+
let callbackManager = CallbackManager()
225+
226+
let receivedMessage = LockIsolated(RealtimeMessageV2?.none)
227+
callbackManager.addSystemCallback { message in
228+
receivedMessage.setValue(message)
229+
}
230+
231+
callbackManager.triggerSystem(
232+
message: RealtimeMessageV2(
233+
joinRef: nil, ref: nil, topic: "test", event: "system", payload: ["status": "ok"]))
234+
235+
XCTAssertEqual(receivedMessage.value?._eventType, .system)
236+
XCTAssertEqual(receivedMessage.value?.status, .ok)
237+
}
218238
}
219239

220240
extension XCTestCase {

Tests/RealtimeTests/RealtimeChannelTests.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@
66
//
77

88
import InlineSnapshotTesting
9-
@testable import Realtime
109
import XCTest
1110
import XCTestDynamicOverlay
1211

12+
@testable import Realtime
13+
1314
final class RealtimeChannelTests: XCTestCase {
1415
let sut = RealtimeChannelV2(
1516
topic: "topic",
@@ -48,9 +49,13 @@ final class RealtimeChannelTests: XCTestCase {
4849

4950
sut.onPresenceChange { _ in }.store(in: &subscriptions)
5051

52+
sut.onSystem {
53+
}
54+
.store(in: &subscriptions)
55+
5156
assertInlineSnapshot(of: sut.callbackManager.callbacks, as: .dump) {
5257
"""
53-
7 elements
58+
8 elements
5459
▿ RealtimeCallback
5560
▿ postgres: PostgresCallback
5661
- callback: (Function)
@@ -112,6 +117,10 @@ final class RealtimeChannelTests: XCTestCase {
112117
▿ presence: PresenceCallback
113118
- callback: (Function)
114119
- id: 7
120+
▿ RealtimeCallback
121+
▿ system: SystemCallback
122+
- callback: (Function)
123+
- id: 8
115124
116125
"""
117126
}

0 commit comments

Comments
 (0)