Skip to content

Commit 0405f5b

Browse files
authored
feat: RPC. (#682)
* feat: RPC. * wip. * add livekit_metrics to proto generate. * wip. * move rpc methods to room. * update. * wip. * wip. * fix. * error handle. * more test case. * test. * wip. * update. * fix flutter analyzer. * raises exception when handler already existed. * mark rpc acks/responses as private. * update.
1 parent b782436 commit 0405f5b

22 files changed

+5582
-1711
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ proto:
44
@{ \
55
if [ -d "$(PROTO_DIR)" ]; \
66
then \
7-
protoc --dart_out=lib/src/proto -I$(PROTO_DIR) $(PROTO_DIR)/livekit_rtc.proto $(PROTO_DIR)/livekit_models.proto; \
7+
protoc --dart_out=lib/src/proto -I$(PROTO_DIR) $(PROTO_DIR)/livekit_rtc.proto $(PROTO_DIR)/livekit_models.proto $(PROTO_DIR)/livekit_metrics.proto; \
88
else \
99
echo "../protocol/protobufs is not found. github.com/livekit/protocol must be checked out"; \
1010
fi \

lib/livekit_client.dart

+1
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,6 @@ export 'src/types/participant_permissions.dart';
5050
export 'src/types/video_dimensions.dart';
5151
export 'src/types/video_encoding.dart';
5252
export 'src/types/video_parameters.dart';
53+
export 'src/types/rpc.dart';
5354
export 'src/widgets/screen_select_dialog.dart';
5455
export 'src/widgets/video_track_renderer.dart';

lib/src/core/engine.dart

+26-6
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
3636
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
3737
import '../publication/local.dart';
3838
import '../support/disposable.dart';
39+
import '../support/platform.dart';
3940
import '../support/region_url_provider.dart';
4041
import '../support/websocket.dart';
4142
import '../track/local/video.dart';
@@ -343,12 +344,13 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
343344
rtc.RTCPeerConnectionState.RTCPeerConnectionStateConnecting) {
344345
await negotiate();
345346
}
346-
347-
logger.fine('Waiting for publisher to ice-connect...');
348-
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
349-
filter: (event) => event.state.isConnected(),
350-
duration: connectOptions.timeouts.peerConnection,
351-
);
347+
if (!lkPlatformIsTest()) {
348+
logger.fine('Waiting for publisher to ice-connect...');
349+
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
350+
filter: (event) => event.state.isConnected(),
351+
duration: connectOptions.timeouts.peerConnection,
352+
);
353+
}
352354
}
353355

354356
// wait for data channel to open (if not already)
@@ -638,6 +640,24 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
638640
events.emit(EngineSipDtmfReceivedEvent(
639641
dtmf: dp.sipDtmf,
640642
));
643+
} else if (dp.whichValue() == lk_models.DataPacket_Value.rpcRequest) {
644+
// RPC Request
645+
events.emit(EngineRPCRequestReceivedEvent(
646+
request: dp.rpcRequest,
647+
identity: dp.participantIdentity,
648+
));
649+
} else if (dp.whichValue() == lk_models.DataPacket_Value.rpcResponse) {
650+
// RPC Response
651+
events.emit(EngineRPCResponseReceivedEvent(
652+
response: dp.rpcResponse,
653+
identity: dp.participantIdentity,
654+
));
655+
} else if (dp.whichValue() == lk_models.DataPacket_Value.rpcAck) {
656+
// RPC Ack
657+
events.emit(EngineRPCAckReceivedEvent(
658+
ack: dp.rpcAck,
659+
identity: dp.participantIdentity,
660+
));
641661
} else {
642662
logger.warning('Unknown data packet type: ${dp.whichValue()}');
643663
}

lib/src/core/room.dart

+60
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import '../track/local/audio.dart';
4545
import '../track/local/video.dart';
4646
import '../track/track.dart';
4747
import '../types/other.dart';
48+
import '../types/rpc.dart';
4849
import '../utils.dart';
4950
import 'engine.dart';
5051

@@ -119,6 +120,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
119120
// Agents
120121
final Map<String, DateTime> _transcriptionReceivedTimes = {};
121122

123+
// RPC Handlers
124+
final Map<String, RpcRequestHandler> _rpcHandlers = {};
125+
126+
// for testing
127+
Map<String, RpcRequestHandler> get rpcHandlers => _rpcHandlers;
128+
122129
Room({
123130
@Deprecated('deprecated, please use connectOptions in room.connect()')
124131
ConnectOptions connectOptions = const ConnectOptions(),
@@ -141,6 +148,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
141148
notifyListeners();
142149
});
143150

151+
_setupRpcListeners();
152+
144153
onDispose(() async {
145154
// clean up routine
146155
await _cleanUp();
@@ -1129,3 +1138,54 @@ extension RoomHardwareManagementMethods on Room {
11291138
events.emit(const AudioPlaybackStatusChanged(isPlaying: false));
11301139
}
11311140
}
1141+
1142+
extension RoomRPCMethods on Room {
1143+
void _setupRpcListeners() {
1144+
// listen for incoming requests
1145+
_engineListener
1146+
..on<EngineRPCRequestReceivedEvent>((event) async {
1147+
final request = event.request;
1148+
await _localParticipant?.handleIncomingRpcRequest(
1149+
event.identity,
1150+
request.id,
1151+
request.method,
1152+
request.payload,
1153+
request.responseTimeoutMs,
1154+
request.version,
1155+
);
1156+
})
1157+
..on<EngineRPCAckReceivedEvent>((event) {
1158+
_localParticipant?.handleIncomingRpcAck(event.requestId);
1159+
})
1160+
..on<EngineRPCResponseReceivedEvent>((event) {
1161+
String? payload;
1162+
RpcError? error;
1163+
1164+
if (event.payload.isNotEmpty) {
1165+
payload = event.response.payload;
1166+
} else if (event.error != null) {
1167+
error = RpcError.fromProto(event.error!);
1168+
}
1169+
_localParticipant?.handleIncomingRpcResponse(
1170+
event.requestId, payload, error);
1171+
});
1172+
}
1173+
1174+
/// Register a handler for incoming RPC requests.
1175+
/// @param method, the method name to listen for.
1176+
/// When a request with this method name is received, the handler will be called.
1177+
/// The handler should return a string payload to send back to the caller.
1178+
/// If the handler returns null, an error will be sent back to the caller.
1179+
void registerRpcMethod(String method, RpcRequestHandler handler) {
1180+
if (rpcHandlers.containsKey(method)) {
1181+
throw Exception('Method $method already registered');
1182+
}
1183+
rpcHandlers[method] = handler;
1184+
}
1185+
1186+
/// Unregister a handler for incoming RPC requests.
1187+
/// @param method, the method name to unregister.
1188+
void unregisterRpcMethod(String method) {
1189+
rpcHandlers.remove(method);
1190+
}
1191+
}

lib/src/internal/events.dart

+35
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,41 @@ class EngineSipDtmfReceivedEvent with EngineEvent, InternalEvent {
446446
});
447447
}
448448

449+
@internal
450+
class EngineRPCRequestReceivedEvent with EngineEvent, InternalEvent {
451+
final lk_models.RpcRequest request;
452+
String get requestId => request.id;
453+
final String identity;
454+
const EngineRPCRequestReceivedEvent({
455+
required this.request,
456+
required this.identity,
457+
});
458+
}
459+
460+
@internal
461+
class EngineRPCResponseReceivedEvent with EngineEvent, InternalEvent {
462+
final lk_models.RpcResponse response;
463+
String get requestId => response.requestId;
464+
final String identity;
465+
lk_models.RpcError? get error => response.error;
466+
String get payload => response.payload;
467+
const EngineRPCResponseReceivedEvent({
468+
required this.response,
469+
required this.identity,
470+
});
471+
}
472+
473+
@internal
474+
class EngineRPCAckReceivedEvent with EngineEvent, InternalEvent {
475+
final lk_models.RpcAck ack;
476+
String get requestId => ack.requestId;
477+
final String identity;
478+
const EngineRPCAckReceivedEvent({
479+
required this.ack,
480+
required this.identity,
481+
});
482+
}
483+
449484
@internal
450485
abstract class DataChannelStateUpdatedEvent with EngineEvent, InternalEvent {
451486
final bool isPrimary;

0 commit comments

Comments
 (0)