Skip to content

Commit fa3802b

Browse files
authored
feat: Fast track publication. (#720)
* feat: Fast track publication. * wip. * done. * fix. * cleanup. * fix analyze. * Remove redundant code. * update. * Update engine.dart
1 parent c05dc14 commit fa3802b

File tree

7 files changed

+376
-304
lines changed

7 files changed

+376
-304
lines changed

lib/src/core/engine.dart

+47-62
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,16 @@ import 'package:connectivity_plus/connectivity_plus.dart';
2323
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
2424
import 'package:meta/meta.dart';
2525

26-
import '../e2ee/options.dart';
27-
import '../events.dart';
28-
import '../exceptions.dart';
26+
import 'package:livekit_client/livekit_client.dart';
2927
import '../extensions.dart';
3028
import '../internal/events.dart';
3129
import '../internal/types.dart';
32-
import '../logger.dart';
33-
import '../managers/event.dart';
34-
import '../options.dart';
3530
import '../proto/livekit_models.pb.dart' as lk_models;
3631
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
37-
import '../publication/local.dart';
3832
import '../support/disposable.dart';
39-
import '../support/platform.dart';
4033
import '../support/region_url_provider.dart';
4134
import '../support/websocket.dart';
42-
import '../track/local/video.dart';
4335
import '../types/internal.dart';
44-
import '../types/other.dart';
45-
import '../types/video_dimensions.dart';
46-
import '../utils.dart';
47-
import 'room.dart';
4836
import 'signal_client.dart';
4937
import 'transport.dart';
5038

@@ -138,6 +126,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
138126

139127
lk_models.ServerInfo? get serverInfo => _serverInfo;
140128

129+
List<lk_models.Codec>? _enabledPublishCodecs;
130+
131+
List<lk_models.Codec>? get enabledPublishCodecs => _enabledPublishCodecs;
132+
141133
void clearReconnectTimeout() {
142134
if (reconnectTimeout != null) {
143135
reconnectTimeout?.cancel();
@@ -202,7 +194,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
202194
);
203195

204196
// wait for join response
205-
await _signalListener.waitFor<SignalJoinResponseEvent>(
197+
await events.waitFor<EngineJoinResponseEvent>(
206198
duration: this.connectOptions.timeouts.connection,
207199
onTimeout: () => throw ConnectException(
208200
'Timed out waiting for SignalJoinResponseEvent',
@@ -249,56 +241,13 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
249241
}
250242

251243
@internal
252-
Future<lk_models.TrackInfo> addTrack({
253-
required String cid,
254-
required String name,
255-
required lk_models.TrackType kind,
256-
required lk_models.TrackSource source,
257-
VideoDimensions? dimensions,
258-
bool? dtx,
259-
Iterable<lk_models.VideoLayer>? videoLayers,
260-
Iterable<lk_rtc.SimulcastCodec>? simulcastCodecs,
261-
String? sid,
262-
String? videoCodec,
263-
String? stream,
264-
bool? disableRed,
265-
}) async {
266-
// TODO: Check if cid already published
267-
268-
lk_models.Encryption_Type encryptionType = lk_models.Encryption_Type.NONE;
269-
if (roomOptions.e2eeOptions != null && !isSVCCodec(videoCodec ?? '')) {
270-
switch (roomOptions.e2eeOptions!.encryptionType) {
271-
case EncryptionType.kNone:
272-
encryptionType = lk_models.Encryption_Type.NONE;
273-
break;
274-
case EncryptionType.kGcm:
275-
encryptionType = lk_models.Encryption_Type.GCM;
276-
break;
277-
case EncryptionType.kCustom:
278-
encryptionType = lk_models.Encryption_Type.CUSTOM;
279-
break;
280-
}
281-
}
282-
244+
Future<lk_models.TrackInfo> addTrack(lk_rtc.AddTrackRequest req) async {
283245
// send request to add track
284-
signalClient.sendAddTrack(
285-
cid: cid,
286-
name: name,
287-
type: kind,
288-
source: source,
289-
dimensions: dimensions,
290-
dtx: dtx,
291-
videoLayers: videoLayers,
292-
encryptionType: encryptionType,
293-
simulcastCodecs: simulcastCodecs,
294-
sid: sid,
295-
stream: stream,
296-
disableRed: disableRed,
297-
);
246+
signalClient.sendAddTrack(req);
298247

299248
// wait for response, or timeout
300249
final event = await _signalListener.waitFor<SignalLocalTrackPublishedEvent>(
301-
filter: (event) => event.cid == cid,
250+
filter: (event) => event.cid == req.cid,
302251
duration: connectOptions.timeouts.publish,
303252
onTimeout: () => throw TrackPublishException(),
304253
);
@@ -941,10 +890,16 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
941890
await _createPeerConnections(rtcConfiguration);
942891
}
943892

944-
if (!_subscriberPrimary) {
945-
// for subscriberPrimary, we negotiate when necessary (lazy)
893+
if (!_subscriberPrimary || event.response.fastPublish) {
894+
_enabledPublishCodecs = event.response.enabledPublishCodecs;
895+
896+
/// for subscriberPrimary, we negotiate when necessary (lazy)
897+
/// and if `response.fastPublish == true`, we need to negotiate
898+
/// immediately
946899
await negotiate();
947900
}
901+
902+
events.emit(EngineJoinResponseEvent(response: event.response));
948903
})
949904
..on<SignalReconnectResponseEvent>((event) async {
950905
var iceServersFromServer =
@@ -1106,11 +1061,41 @@ extension EnginePrivateMethods on Engine {
11061061
}
11071062

11081063
extension EngineInternalMethods on Engine {
1064+
@internal
1065+
Future<rtc.RTCRtpTransceiver> createTransceiverRTCRtpSender(
1066+
LocalTrack track,
1067+
PublishOptions opts,
1068+
List<rtc.RTCRtpEncoding>? encodings,
1069+
) async {
1070+
if (publisher == null) {
1071+
throw UnexpectedConnectionState('publisher is closed');
1072+
}
1073+
1074+
if (track.mediaStreamTrack.kind == 'video' && opts is VideoPublishOptions) {
1075+
track.codec = opts.videoCodec;
1076+
}
1077+
var transceiverInit = rtc.RTCRtpTransceiverInit(
1078+
direction: rtc.TransceiverDirection.SendOnly,
1079+
);
1080+
if (encodings != null) {
1081+
transceiverInit.sendEncodings = encodings;
1082+
}
1083+
final transceiver = await publisher!.pc.addTransceiver(
1084+
track: track.mediaStreamTrack,
1085+
kind: track is LocalVideoTrack
1086+
? rtc.RTCRtpMediaType.RTCRtpMediaTypeVideo
1087+
: rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio,
1088+
init: transceiverInit,
1089+
);
1090+
return transceiver;
1091+
}
1092+
11091093
@internal
11101094
List<lk_rtc.DataChannelInfo> dataChannelInfo() => [
11111095
_reliableDCPub,
11121096
_lossyDCPub
11131097
].nonNulls.where((e) => e.id != -1).map((e) => e.toLKInfoType()).toList();
1098+
11141099
@internal
11151100
Future<rtc.RTCRtpSender> createSimulcastTransceiverSender(
11161101
LocalVideoTrack track,

lib/src/core/room.dart

+85-85
Original file line numberDiff line numberDiff line change
@@ -287,91 +287,6 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
287287
}
288288

289289
void _setUpSignalListeners() => _signalListener
290-
..on<SignalJoinResponseEvent>((event) {
291-
_roomInfo = event.response.room;
292-
_name = event.response.room.name;
293-
_metadata = event.response.room.metadata;
294-
_serverVersion = event.response.serverVersion;
295-
_serverRegion = event.response.serverRegion;
296-
297-
if (_isRecording != event.response.room.activeRecording) {
298-
_isRecording = event.response.room.activeRecording;
299-
emitWhenConnected(
300-
RoomRecordingStatusChanged(activeRecording: _isRecording));
301-
}
302-
303-
logger.fine('[Engine] Received JoinResponse, '
304-
'serverVersion: ${event.response.serverVersion}');
305-
306-
_localParticipant ??= LocalParticipant(
307-
room: this,
308-
info: event.response.participant,
309-
);
310-
311-
if (engine.fullReconnectOnNext) {
312-
_localParticipant!.updateFromInfo(event.response.participant);
313-
}
314-
315-
if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index &&
316-
engine.fastConnectOptions != null &&
317-
!engine.fullReconnectOnNext) {
318-
var options = engine.fastConnectOptions!;
319-
320-
var audio = options.microphone;
321-
bool audioEnabled = audio.enabled == true || audio.track != null;
322-
if (audioEnabled) {
323-
if (audio.track != null) {
324-
_localParticipant!.publishAudioTrack(audio.track as LocalAudioTrack,
325-
publishOptions: roomOptions.defaultAudioPublishOptions);
326-
} else {
327-
_localParticipant!.setMicrophoneEnabled(true,
328-
audioCaptureOptions: roomOptions.defaultAudioCaptureOptions);
329-
}
330-
}
331-
332-
var video = options.camera;
333-
bool videoEnabled = video.enabled == true || video.track != null;
334-
if (videoEnabled) {
335-
if (video.track != null) {
336-
_localParticipant!.publishVideoTrack(video.track as LocalVideoTrack,
337-
publishOptions: roomOptions.defaultVideoPublishOptions);
338-
} else {
339-
_localParticipant!.setCameraEnabled(true,
340-
cameraCaptureOptions: roomOptions.defaultCameraCaptureOptions);
341-
}
342-
}
343-
344-
var screen = options.screen;
345-
bool screenEnabled = screen.enabled == true || screen.track != null;
346-
if (screenEnabled) {
347-
if (screen.track != null) {
348-
_localParticipant!.publishVideoTrack(
349-
screen.track as LocalVideoTrack,
350-
publishOptions: roomOptions.defaultVideoPublishOptions);
351-
} else {
352-
_localParticipant!.setScreenShareEnabled(true,
353-
screenShareCaptureOptions:
354-
roomOptions.defaultScreenShareCaptureOptions);
355-
}
356-
}
357-
}
358-
359-
for (final info in event.response.otherParticipants) {
360-
logger.fine(
361-
'Creating RemoteParticipant: sid = ${info.sid}(identity:${info.identity}) '
362-
'tracks:${info.tracks.map((e) => e.sid)}');
363-
_getOrCreateRemoteParticipant(info.identity, info);
364-
}
365-
366-
if (e2eeManager != null && event.response.sifTrailer.isNotEmpty) {
367-
e2eeManager!.keyProvider
368-
.setSifTrailer(Uint8List.fromList(event.response.sifTrailer));
369-
}
370-
371-
logger.fine('Room Connect completed');
372-
373-
events.emit(RoomConnectedEvent(room: this, metadata: _metadata));
374-
})
375290
..on<SignalParticipantUpdateEvent>(
376291
(event) => _onParticipantUpdateEvent(event.participants))
377292
..on<SignalSpeakersChangedEvent>(
@@ -473,6 +388,91 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
473388
});
474389

475390
void _setUpEngineListeners() => _engineListener
391+
..on<EngineJoinResponseEvent>((event) {
392+
_roomInfo = event.response.room;
393+
_name = event.response.room.name;
394+
_metadata = event.response.room.metadata;
395+
_serverVersion = event.response.serverVersion;
396+
_serverRegion = event.response.serverRegion;
397+
398+
if (_isRecording != event.response.room.activeRecording) {
399+
_isRecording = event.response.room.activeRecording;
400+
emitWhenConnected(
401+
RoomRecordingStatusChanged(activeRecording: _isRecording));
402+
}
403+
404+
logger.fine('[Engine] Received JoinResponse, '
405+
'serverVersion: ${event.response.serverVersion}');
406+
407+
_localParticipant ??= LocalParticipant(
408+
room: this,
409+
info: event.response.participant,
410+
);
411+
412+
if (engine.fullReconnectOnNext) {
413+
_localParticipant!.updateFromInfo(event.response.participant);
414+
}
415+
416+
if (connectOptions.protocolVersion.index >= ProtocolVersion.v8.index &&
417+
engine.fastConnectOptions != null &&
418+
!engine.fullReconnectOnNext) {
419+
var options = engine.fastConnectOptions!;
420+
421+
var audio = options.microphone;
422+
bool audioEnabled = audio.enabled == true || audio.track != null;
423+
if (audioEnabled) {
424+
if (audio.track != null) {
425+
_localParticipant!.publishAudioTrack(audio.track as LocalAudioTrack,
426+
publishOptions: roomOptions.defaultAudioPublishOptions);
427+
} else {
428+
_localParticipant!.setMicrophoneEnabled(true,
429+
audioCaptureOptions: roomOptions.defaultAudioCaptureOptions);
430+
}
431+
}
432+
433+
var video = options.camera;
434+
bool videoEnabled = video.enabled == true || video.track != null;
435+
if (videoEnabled) {
436+
if (video.track != null) {
437+
_localParticipant!.publishVideoTrack(video.track as LocalVideoTrack,
438+
publishOptions: roomOptions.defaultVideoPublishOptions);
439+
} else {
440+
_localParticipant!.setCameraEnabled(true,
441+
cameraCaptureOptions: roomOptions.defaultCameraCaptureOptions);
442+
}
443+
}
444+
445+
var screen = options.screen;
446+
bool screenEnabled = screen.enabled == true || screen.track != null;
447+
if (screenEnabled) {
448+
if (screen.track != null) {
449+
_localParticipant!.publishVideoTrack(
450+
screen.track as LocalVideoTrack,
451+
publishOptions: roomOptions.defaultVideoPublishOptions);
452+
} else {
453+
_localParticipant!.setScreenShareEnabled(true,
454+
screenShareCaptureOptions:
455+
roomOptions.defaultScreenShareCaptureOptions);
456+
}
457+
}
458+
}
459+
460+
for (final info in event.response.otherParticipants) {
461+
logger.fine(
462+
'Creating RemoteParticipant: sid = ${info.sid}(identity:${info.identity}) '
463+
'tracks:${info.tracks.map((e) => e.sid)}');
464+
_getOrCreateRemoteParticipant(info.identity, info);
465+
}
466+
467+
if (e2eeManager != null && event.response.sifTrailer.isNotEmpty) {
468+
e2eeManager!.keyProvider
469+
.setSifTrailer(Uint8List.fromList(event.response.sifTrailer));
470+
}
471+
472+
logger.fine('Room Connect completed');
473+
474+
events.emit(RoomConnectedEvent(room: this, metadata: _metadata));
475+
})
476476
..on<EngineResumedEvent>((event) async {
477477
// re-send tracks permissions
478478
localParticipant?.sendTrackSubscriptionPermissions();

0 commit comments

Comments
 (0)