Skip to content

Commit 7fbc9e8

Browse files
committed
fix(rpc): cleanup, docs, default perform rpc timeout
1 parent cef14ed commit 7fbc9e8

File tree

5 files changed

+83
-40
lines changed

5 files changed

+83
-40
lines changed

engine.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -585,14 +585,14 @@ func (e *RTCEngine) handleDataPacket(msg webrtc.DataChannelMessage) {
585585
}
586586
case *livekit.DataPacket_RpcResponse:
587587
if e.OnRpcResponse != nil {
588-
if _, ok := msg.RpcResponse.Value.(*livekit.RpcResponse_Payload); ok {
588+
switch msg.RpcResponse.Value.(type) {
589+
case *livekit.RpcResponse_Payload:
589590
e.OnRpcResponse(msg.RpcResponse.RequestId, &msg.RpcResponse.Value.(*livekit.RpcResponse_Payload).Payload, nil)
590-
} else if _, ok := msg.RpcResponse.Value.(*livekit.RpcResponse_Error); ok {
591+
case *livekit.RpcResponse_Error:
591592
e.OnRpcResponse(msg.RpcResponse.RequestId, nil, fromProto(msg.RpcResponse.Value.(*livekit.RpcResponse_Error).Error))
592593
}
593594
}
594595
}
595-
596596
}
597597

598598
func (e *RTCEngine) readDataPacket(msg webrtc.DataChannelMessage) (*livekit.DataPacket, error) {

examples/rpc/main.go

-5
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ func performGreeting(room *lksdk.Room) {
3434
DestinationIdentity: "greeter",
3535
Method: "arrival",
3636
Payload: "Hello",
37-
ResponseTimeout: 10000 * time.Millisecond,
3837
})
3938

4039
if err != nil {
@@ -51,7 +50,6 @@ func performDisconnection(room *lksdk.Room) {
5150
DestinationIdentity: "greeter",
5251
Method: "arrival",
5352
Payload: "You still there?",
54-
ResponseTimeout: 10000 * time.Millisecond,
5553
})
5654

5755
if err != nil {
@@ -80,7 +78,6 @@ func performSquareRoot(room *lksdk.Room) {
8078
DestinationIdentity: "math-genius",
8179
Method: "square-root",
8280
Payload: string(payload),
83-
ResponseTimeout: 10000 * time.Millisecond,
8481
})
8582

8683
if err != nil {
@@ -111,7 +108,6 @@ func performQuantumHypergeometricSeries(room *lksdk.Room) {
111108
DestinationIdentity: "math-genius",
112109
Method: "quantum-hypergeometric-series",
113110
Payload: string(payload),
114-
ResponseTimeout: 10000 * time.Millisecond,
115111
})
116112

117113
if err != nil {
@@ -155,7 +151,6 @@ func performDivide(room *lksdk.Room) {
155151
DestinationIdentity: "math-genius",
156152
Method: "divide",
157153
Payload: string(payload),
158-
ResponseTimeout: 10000 * time.Millisecond,
159154
})
160155

161156
if err != nil {

localparticipant.go

+23-8
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,7 @@ func (p *LocalParticipant) handleParticipantDisconnected(identity string) {
556556

557557
p.rpcPendingResponses.Range(func(key, value interface{}) bool {
558558
if value.(rpcPendingResponseHandler).participantIdentity == identity {
559-
value.(rpcPendingResponseHandler).resolve(nil, RpcErrorFromBuiltInCodes(RpcRecipientDisconnected, nil))
559+
value.(rpcPendingResponseHandler).resolve(nil, rpcErrorFromBuiltInCodes(RpcRecipientDisconnected, nil))
560560
p.rpcPendingResponses.Delete(key)
561561
}
562562
return true
@@ -583,37 +583,52 @@ func (p *LocalParticipant) handleIncomingRpcResponse(requestId string, payload *
583583
}
584584
}
585585

586-
// TODO: fix default timeout
586+
// Initiate an RPC call to a remote participant
587+
// - @param params - For parameters for initiating the RPC call, see PerformRpcParams
588+
// - @returns A string payload or an error
587589
func (p *LocalParticipant) PerformRpc(params PerformRpcParams) (*string, error) {
590+
responseTimeout := 10000 * time.Millisecond
591+
if params.ResponseTimeout != nil {
592+
responseTimeout = *params.ResponseTimeout
593+
}
594+
588595
resultChan := make(chan *string, 1)
589596
errorChan := make(chan error, 1)
590597

591598
maxRoundTripLatency := 2000 * time.Millisecond
592599

593600
go func() {
594601
if byteLength(params.Payload) > MaxPayloadBytes {
595-
errorChan <- RpcErrorFromBuiltInCodes(RpcRequestPayloadTooLarge, nil)
602+
errorChan <- rpcErrorFromBuiltInCodes(RpcRequestPayloadTooLarge, nil)
596603
return
597604
}
598605

599606
if p.serverInfo != nil && compareVersions(p.serverInfo.Version, "1.8.0") < 0 {
600-
errorChan <- RpcErrorFromBuiltInCodes(RpcUnsupportedServer, nil)
607+
errorChan <- rpcErrorFromBuiltInCodes(RpcUnsupportedServer, nil)
601608
return
602609
}
603610

604611
id := uuid.New().String()
605-
p.engine.publishRpcRequest(params.DestinationIdentity, id, params.Method, params.Payload, params.ResponseTimeout-maxRoundTripLatency)
612+
p.engine.publishRpcRequest(params.DestinationIdentity, id, params.Method, params.Payload, responseTimeout-maxRoundTripLatency)
606613

607-
responseTimer := time.AfterFunc(params.ResponseTimeout, func() {
614+
responseTimer := time.AfterFunc(responseTimeout, func() {
608615
p.rpcPendingResponses.Delete(id)
609-
errorChan <- RpcErrorFromBuiltInCodes(RpcResponseTimeout, nil)
616+
617+
select {
618+
case errorChan <- rpcErrorFromBuiltInCodes(RpcResponseTimeout, nil):
619+
default:
620+
}
610621
})
611622

612623
ackTimer := time.AfterFunc(maxRoundTripLatency, func() {
613624
p.rpcPendingAcks.Delete(id)
614-
errorChan <- RpcErrorFromBuiltInCodes(RpcConnectionTimeout, nil)
615625
p.rpcPendingResponses.Delete(id)
616626
responseTimer.Stop()
627+
628+
select {
629+
case errorChan <- rpcErrorFromBuiltInCodes(RpcConnectionTimeout, nil):
630+
default:
631+
}
617632
})
618633

619634
p.rpcPendingAcks.Store(id, rpcPendingAckHandler{

room.go

+29-4
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,29 @@ func (r *Room) getLocalParticipantSID() string {
934934
return r.LocalParticipant.SID()
935935
}
936936

937+
// Establishes the participant as a receiver for calls of the specified RPC method.
938+
// Will overwrite any existing callback for the same method.
939+
//
940+
// - @param method - The name of the indicated RPC method
941+
// - @param handler - Will be invoked when an RPC request for this method is received
942+
// - @returns A promise that resolves when the method is successfully registered
943+
//
944+
// Example:
945+
//
946+
// room.LocalParticipant?.registerRpcMethod(
947+
// "greet",
948+
// func (data: RpcInvocationData) => {
949+
// fmt.Println("Received greeting from ", data.callerIdentity, "with payload ", data.payload)
950+
// return "Hello, " + data.callerIdentity + "!";
951+
// }
952+
// );
953+
//
954+
// The handler should return either a string or an error.
955+
// If unable to respond within `responseTimeout`, the request will result in an error on the caller's side.
956+
//
957+
// You may throw errors of type `RpcError` with a string `message` in the handler,
958+
// and they will be received on the caller's side with the message intact.
959+
// Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
937960
func (r *Room) RegisterRpcMethod(method string, handler RpcHandlerFunc) error {
938961
_, ok := r.rpcHandlers.Load(method)
939962
if ok {
@@ -943,6 +966,8 @@ func (r *Room) RegisterRpcMethod(method string, handler RpcHandlerFunc) error {
943966
return nil
944967
}
945968

969+
// Unregisters a previously registered RPC method.
970+
// - @param method - The name of the RPC method to unregister
946971
func (r *Room) UnregisterRpcMethod(method string) {
947972
r.rpcHandlers.Delete(method)
948973
}
@@ -951,13 +976,13 @@ func (r *Room) handleIncomingRpcRequest(callerIdentity, requestId, method, paylo
951976
r.engine.publishRpcAck(callerIdentity, requestId)
952977

953978
if version != 1 {
954-
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcUnsupportedVersion, nil))
979+
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedVersion, nil))
955980
return
956981
}
957982

958983
handler, ok := r.rpcHandlers.Load(method)
959984
if !ok {
960-
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcUnsupportedMethod, nil))
985+
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcUnsupportedMethod, nil))
961986
return
962987
}
963988

@@ -973,13 +998,13 @@ func (r *Room) handleIncomingRpcRequest(callerIdentity, requestId, method, paylo
973998
r.engine.publishRpcResponse(callerIdentity, requestId, nil, err.(*RpcError))
974999
} else {
9751000
r.engine.log.Warnw("Uncaught error returned by RPC handler for method, using application error instead", err, "method", method)
976-
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcApplicationError, nil))
1001+
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcApplicationError, nil))
9771002
}
9781003
return
9791004
}
9801005

9811006
if byteLength(response) > MaxDataBytes {
982-
r.engine.publishRpcResponse(callerIdentity, requestId, nil, RpcErrorFromBuiltInCodes(RpcResponsePayloadTooLarge, nil))
1007+
r.engine.publishRpcResponse(callerIdentity, requestId, nil, rpcErrorFromBuiltInCodes(RpcResponsePayloadTooLarge, nil))
9831008
return
9841009
}
9851010

rpc.go

+28-20
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ const (
2929
const (
3030
MaxMessageBytes = 256
3131
MaxDataBytes = 15360 // 15KiB
32+
33+
// Maximum payload size for RPC requests and responses. If a payload exceeds this size,
34+
// the RPC call will fail with a RpcRequestPayloadTooLarge(1402) or RpcResponsePayloadTooLarge(1504) error.
3235
MaxPayloadBytes = 15360 // 15KiB
3336
)
3437

@@ -47,11 +50,16 @@ var rpcErrorMessages = map[RpcErrorCode]string{
4750
RpcUnsupportedVersion: "Unsupported RPC version",
4851
}
4952

53+
// Parameters for initiating an RPC call
5054
type PerformRpcParams struct {
55+
// The identity of the destination participant
5156
DestinationIdentity string
52-
Method string
53-
Payload string
54-
ResponseTimeout time.Duration
57+
// The name of the method to call
58+
Method string
59+
// The method payload
60+
Payload string
61+
// Timeout for receiving a response after initial connection. Default: 10000ms
62+
ResponseTimeout *time.Duration
5563
}
5664

5765
// Data passed to method handler for incoming RPC invocations
@@ -60,33 +68,32 @@ type RpcInvocationData struct {
6068
RequestID string
6169
// The unique participant identity of the caller.
6270
CallerIdentity string
63-
// The payload of the request. User-definable format, typically JSON.
71+
// The payload of the request. User-definable format, could be JSON for example.
6472
Payload string
6573
// The maximum time the caller will wait for a response.
6674
ResponseTimeout time.Duration
6775
}
6876

69-
/**
70-
* Specialized error handling for RPC methods.
71-
*
72-
* Instances of this type, when thrown in a method handler, will have their `message`
73-
* serialized and sent across the wire. The sender will receive an equivalent error on the other side.
74-
*
75-
* Built-in types are included but developers may use any string, with a max length of 256 bytes.
76-
*/
77+
// Specialized error handling for RPC methods.
78+
//
79+
// Instances of this type, when thrown in a method handler, will have their `message`
80+
// serialized and sent across the wire. The sender will receive an equivalent error on the other side.
81+
//
82+
// Built-in types are included but developers may use any string, with a max length of 256 bytes.
7783
type RpcError struct {
7884
Code RpcErrorCode
7985
Message string
8086
Data *string
8187
}
8288

83-
/**
84-
* Creates an error object with the given code and message, plus an optional data payload.
85-
*
86-
* If thrown in an RPC method handler, the error will be sent back to the caller.
87-
*
88-
* Error codes 1001-1999 are reserved for built-in errors.
89-
*/
89+
// Creates an error object with the given code and message, plus an optional data payload.
90+
//
91+
// If thrown in an RPC method handler, the error will be sent back to the caller.
92+
//
93+
// Error codes 1001-1999 are reserved for built-in errors.
94+
//
95+
// Maximum message length is 256 bytes, and maximum data payload length is 15KiB.
96+
// If a payload exceeds these limits, it will be truncated.
9097
func NewRpcError(code RpcErrorCode, message string, data *string) *RpcError {
9198
err := &RpcError{Code: code, Message: truncateBytes(message, MaxMessageBytes)}
9299

@@ -123,7 +130,8 @@ func (e *RpcError) Error() string {
123130
return fmt.Sprintf("RpcError %d: %s", e.Code, e.Message)
124131
}
125132

126-
func RpcErrorFromBuiltInCodes(code RpcErrorCode, data *string) *RpcError {
133+
// Creates an error object with a built-in (or reserved) code and optional data payload.
134+
func rpcErrorFromBuiltInCodes(code RpcErrorCode, data *string) *RpcError {
127135
return NewRpcError(code, rpcErrorMessages[code], data)
128136
}
129137

0 commit comments

Comments
 (0)