Skip to content

Commit 60bdee9

Browse files
authored
[Fix]Convert AsyncStream to Publishers > AsyncStream to break memory leaks (#759)
1 parent f4617eb commit 60bdee9

File tree

12 files changed

+571
-114
lines changed

12 files changed

+571
-114
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
66

77
### ✅ Added
88
- Countdown timer and waiting participants info to the livestream player [#754](https://github.com/GetStream/stream-video-swift/pull/754)
9+
- EventPublisher for `Call` objects. [#759](https://github.com/GetStream/stream-video-swift/pull/759)
10+
11+
### 🐞 Fixed
12+
- `CallViewModel.callingState` transition to `.idle` just before moving to `.inCall` after the user has accepted the call. [#759](https://github.com/GetStream/stream-video-swift/pull/759)
913

1014
### 🔄 Changed
1115
- `CallSettings` won't be propagated between calls when using the `CallViewModel`. [#751](https://github.com/GetStream/stream-video-swift/pull/751)

Sources/StreamVideo/Call.swift

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,20 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
2828
callCid(from: callId, callType: callType)
2929
}
3030

31+
private let eventSubject: PassthroughSubject<WrappedEvent, Never> = .init()
32+
public var eventPublisher: AnyPublisher<VideoEvent, Never> {
33+
eventSubject
34+
.compactMap {
35+
switch $0 {
36+
case let .coordinatorEvent(event):
37+
return event
38+
default:
39+
return nil
40+
}
41+
}
42+
.eraseToAnyPublisher()
43+
}
44+
3145
/// Provides access to the microphone.
3246
public let microphone: MicrophoneManager
3347
/// Provides access to the camera.
@@ -37,7 +51,6 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
3751

3852
internal let callController: CallController
3953
internal let coordinatorClient: DefaultAPI
40-
private var eventHandlers = [EventHandler]()
4154
private var cancellables = DisposableBag()
4255

4356
/// This adapter is used to manage closed captions for the
@@ -90,6 +103,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
90103
}
91104

92105
deinit {
106+
log.debug("Call cID:\(cId) is deallocating...")
93107
cancellables.removeAll()
94108
}
95109

@@ -481,45 +495,30 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
481495
try await callController.stopScreensharing()
482496
}
483497

498+
public func eventPublisher<WSEvent: Event>(for event: WSEvent.Type) -> AnyPublisher<WSEvent, Never> {
499+
eventPublisher
500+
.compactMap { $0.rawValue as? WSEvent }
501+
.eraseToAnyPublisher()
502+
}
503+
484504
/// Subscribes to video events.
485505
/// - Returns: `AsyncStream` of `VideoEvent`s.
486506
public func subscribe() -> AsyncStream<VideoEvent> {
487-
AsyncStream(VideoEvent.self) { [weak self] continuation in
488-
let eventHandler = EventHandler(handler: { event in
489-
guard case let .coordinatorEvent(event) = event else {
490-
return
491-
}
492-
continuation.yield(event)
493-
}, cancel: { continuation.finish() })
494-
self?.eventHandlers.append(eventHandler)
495-
}
507+
eventPublisher.eraseAsAsyncStream()
496508
}
497509

498510
/// Subscribes to a particular web socket event.
499511
/// - Parameter event: the type of the event you are subscribing to.
500512
/// - Returns: `AsyncStream` of web socket events from the provided type.
501513
public func subscribe<WSEvent: Event>(for event: WSEvent.Type) -> AsyncStream<WSEvent> {
502-
AsyncStream(event) { [weak self] continuation in
503-
let eventHandler = EventHandler(handler: { event in
504-
guard case let .coordinatorEvent(event) = event else {
505-
return
506-
}
507-
if let event = event.rawValue as? WSEvent {
508-
continuation.yield(event)
509-
}
510-
}, cancel: { continuation.finish() })
511-
512-
self?.eventHandlers.append(eventHandler)
513-
}
514+
eventPublisher(for: event).eraseAsAsyncStream()
514515
}
515516

516517
/// Leave the current call.
517518
public func leave() {
518519
postNotification(with: CallNotification.callEnded, object: self)
519-
eventHandlers.forEach { $0.cancel() }
520520

521521
cancellables.removeAll()
522-
eventHandlers.removeAll()
523522
callController.leave()
524523
closedCaptionsAdapter.stop()
525524
stateMachine.transition(.idle(self))
@@ -1384,12 +1383,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
13841383
self.state.updateState(from: videoEvent)
13851384
}
13861385

1387-
// Get a copy of eventHandlers to avoid crashes when `leave` call is being
1388-
// triggered, during event processing.
1389-
let eventHandlers = self.eventHandlers
1390-
for eventHandler in eventHandlers {
1391-
eventHandler.handler(event)
1392-
}
1386+
eventSubject.send(event)
13931387
}
13941388

13951389
@MainActor

Sources/StreamVideo/Controllers/CallController.swift

Lines changed: 28 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import StreamWebRTC
88

99
/// Class that handles a particular call.
1010
class CallController: @unchecked Sendable {
11+
private enum DisposableKey: String {
12+
case participantsCountUpdatesEvent
13+
case currentUserBlocked
14+
}
1115

1216
private lazy var webRTCCoordinator = webRTCCoordinatorFactory.buildCoordinator(
1317
user: user,
@@ -42,13 +46,8 @@ class CallController: @unchecked Sendable {
4246
private let apiKey: String
4347
private let defaultAPI: DefaultAPI
4448
private let videoConfig: VideoConfig
45-
private let sfuReconnectionTime: CGFloat
4649
private let webRTCCoordinatorFactory: WebRTCCoordinatorProviding
47-
private var reconnectionDate: Date?
4850
private var cachedLocation: String?
49-
private var currentSFU: String?
50-
private var participantsCountUpdatesTask: Task<Void, Never>?
51-
private var currentUserBlockedTask: Task<Void, Never>?
5251

5352
private let joinCallResponseSubject = CurrentValueSubject<JoinCallResponse?, Never>(nil)
5453
private var joinCallResponseFetchObserver: AnyCancellable?
@@ -74,7 +73,6 @@ class CallController: @unchecked Sendable {
7473
self.callType = callType
7574
self.apiKey = apiKey
7675
self.videoConfig = videoConfig
77-
sfuReconnectionTime = 30
7876
self.defaultAPI = defaultAPI
7977
self.cachedLocation = cachedLocation
8078
self.webRTCCoordinatorFactory = webRTCCoordinatorFactory
@@ -562,7 +560,6 @@ class CallController: @unchecked Sendable {
562560

563561
private func didFetch(_ response: JoinCallResponse) async {
564562
let sessionId = await webRTCCoordinator.stateAdapter.sessionID
565-
currentSFU = response.credentials.server.edgeName
566563
Task { @MainActor [weak self] in
567564
self?.call?.state.sessionId = sessionId
568565
self?.call?.update(recordingState: response.call.recording ? .recording : .noRecording)
@@ -584,8 +581,7 @@ class CallController: @unchecked Sendable {
584581
case .joined:
585582
/// Once connected we should stop listening for CallSessionParticipantCountsUpdatedEvent
586583
/// updates and only rely on the healthCheck event.
587-
participantsCountUpdatesTask?.cancel()
588-
participantsCountUpdatesTask = nil
584+
disposableBag.remove(DisposableKey.participantsCountUpdatesEvent.rawValue)
589585

590586
call?.update(reconnectionStatus: .connected)
591587
case .error:
@@ -601,37 +597,36 @@ class CallController: @unchecked Sendable {
601597
}
602598

603599
private func subscribeToParticipantsCountUpdatesEvent(_ call: Call?) {
604-
participantsCountUpdatesTask?.cancel()
605-
participantsCountUpdatesTask = nil
600+
disposableBag.remove(DisposableKey.participantsCountUpdatesEvent.rawValue)
606601

607602
guard let call else { return }
608603

609-
participantsCountUpdatesTask = Task {
610-
let anonymousUserRoleKey = "anonymous"
611-
for await event in call.subscribe(for: CallSessionParticipantCountsUpdatedEvent.self) {
612-
Task { @MainActor in
613-
call.state.participantCount = event
614-
.participantsCountByRole
615-
.filter { $0.key != anonymousUserRoleKey } // TODO: Workaround. To be removed
616-
.values
617-
.map(UInt32.init)
618-
.reduce(0) { $0 + $1 }
619-
620-
// TODO: Workaround. To be removed
621-
if event.anonymousParticipantCount > 0 {
622-
call.state.anonymousParticipantCount = UInt32(event.anonymousParticipantCount)
623-
} else if let anonymousCount = event.participantsCountByRole[anonymousUserRoleKey] {
624-
call.state.anonymousParticipantCount = UInt32(anonymousCount)
625-
} else {
626-
call.state.anonymousParticipantCount = 0
627-
}
604+
let anonymousUserRoleKey = "anonymous"
605+
606+
call
607+
.eventPublisher(for: CallSessionParticipantCountsUpdatedEvent.self)
608+
.sinkTask { @MainActor [weak call] event in
609+
call?.state.participantCount = event
610+
.participantsCountByRole
611+
.filter { $0.key != anonymousUserRoleKey } // TODO: Workaround. To be removed
612+
.values
613+
.map(UInt32.init)
614+
.reduce(0) { $0 + $1 }
615+
616+
// TODO: Workaround. To be removed
617+
if event.anonymousParticipantCount > 0 {
618+
call?.state.anonymousParticipantCount = UInt32(event.anonymousParticipantCount)
619+
} else if let anonymousCount = event.participantsCountByRole[anonymousUserRoleKey] {
620+
call?.state.anonymousParticipantCount = UInt32(anonymousCount)
621+
} else {
622+
call?.state.anonymousParticipantCount = 0
628623
}
629624
}
630-
}
625+
.store(in: disposableBag, key: DisposableKey.participantsCountUpdatesEvent.rawValue)
631626
}
632627

633628
private func subscribeToCurrentUserBlockedState(_ call: Call?) {
634-
disposableBag.remove("current-user-blocked")
629+
disposableBag.remove(DisposableKey.currentUserBlocked.rawValue)
635630
guard let call else { return }
636631
let currentUser = user
637632
Task { @MainActor [weak self] in
@@ -651,7 +646,7 @@ class CallController: @unchecked Sendable {
651646
.stateMachine
652647
.transition(.blocked(self.webRTCCoordinator.stateMachine.currentStage.context))
653648
}
654-
.store(in: disposableBag, key: "current-user-blocked")
649+
.store(in: disposableBag, key: DisposableKey.currentUserBlocked.rawValue)
655650
}
656651
}
657652

Sources/StreamVideo/Utils/CallCache/CallCache.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ final class CallCache {
2929
log.debug("Will create and cache call:\(cId)")
3030
let call = factory()
3131
storage[cId] = call
32+
log.debug("CallCache count:\(storage.count).")
3233
return call
3334
}
3435
}
@@ -42,13 +43,15 @@ final class CallCache {
4243
log.debug("Will remove call:\(cId)")
4344
queue.sync {
4445
storage[cId] = nil
46+
log.debug("CallCache count:\(storage.count).")
4547
}
4648
}
4749

4850
func removeAll() {
4951
queue.sync {
5052
log.debug("Will remove \(storage.count) calls.")
5153
storage.removeAll()
54+
log.debug("CallCache count:\(storage.count).")
5255
}
5356
}
5457
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
7+
public extension Publisher where Output: Sendable {
8+
9+
/// Converts the current publisher into an `AsyncStream` of its output.
10+
///
11+
/// This allows you to consume any Combine publisher using Swift's `for await`
12+
/// syntax, providing a bridge between Combine and Swift Concurrency.
13+
///
14+
/// - Returns: An `AsyncStream` emitting values of the publisher as they arrive.
15+
func eraseAsAsyncStream() -> AsyncStream<Output> {
16+
AsyncStream { continuation in
17+
let cancellable = self.sink(
18+
receiveCompletion: { _ in
19+
continuation.finish()
20+
},
21+
receiveValue: {
22+
continuation.yield($0)
23+
}
24+
)
25+
26+
continuation.onTermination = { @Sendable _ in
27+
cancellable.cancel()
28+
}
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)