Skip to content

Commit 2f7fee3

Browse files
authored
Establish publisher PC along with connect in v2. (#709)
* WIP * queue up message sends * WIP * handle more messages * clean up * message queue * signalling version in state * clean up * refactor PC configure * WIP
1 parent 4f29f99 commit 2f7fee3

File tree

7 files changed

+200
-85
lines changed

7 files changed

+200
-85
lines changed

engine.go

Lines changed: 148 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,23 @@ func (e *RTCEngine) JoinContext(
248248
e.token.Store(token)
249249
e.connParams = connectParams
250250

251-
err := e.signalTransport.Join(ctx, url, token, *connectParams)
251+
var (
252+
publisherOffer webrtc.SessionDescription
253+
err error
254+
)
255+
if e.signallingVersion == signalling.SignallingVersionV2 {
256+
e.pclock.Lock()
257+
e.createPublisherPCLocked(webrtc.Configuration{}, false)
258+
259+
publisherOffer, err = e.publisher.GetOffer()
260+
if err != nil {
261+
e.pclock.Unlock()
262+
return false, err
263+
}
264+
e.pclock.Unlock()
265+
}
266+
267+
err = e.signalTransport.Join(ctx, url, token, *connectParams, publisherOffer)
252268
if err != nil {
253269
if verr := e.validate(ctx, url, token, connectParams, ""); verr != nil {
254270
return false, verr
@@ -335,6 +351,7 @@ func (e *RTCEngine) configure(
335351
clientConfig *livekit.ClientConfiguration,
336352
subscriberPrimary *bool,
337353
) error {
354+
e.log.Debugw("Using ICE servers", "servers", iceServers)
338355
configuration := e.makeRTCConfiguration(iceServers, clientConfig)
339356

340357
// reset reliable message sequence
@@ -345,16 +362,30 @@ func (e *RTCEngine) configure(
345362
e.pclock.Lock()
346363
defer e.pclock.Unlock()
347364

348-
// remove previous transport
365+
if subscriberPrimary != nil {
366+
e.subscriberPrimary = *subscriberPrimary
367+
}
368+
349369
if e.publisher != nil {
350-
e.publisher.Close()
351-
e.publisher = nil
370+
setConfiguration(e.publisher, configuration)
371+
} else {
372+
if err := e.createPublisherPCLocked(configuration, !e.subscriberPrimary); err != nil {
373+
return err
374+
}
352375
}
376+
353377
if e.subscriber != nil {
354-
e.subscriber.Close()
355-
e.subscriber = nil
378+
setConfiguration(e.subscriber, configuration)
379+
} else {
380+
if err := e.createSubscriberPCLocked(configuration, e.subscriberPrimary); err != nil {
381+
return err
382+
}
356383
}
357384

385+
return nil
386+
}
387+
388+
func (e *RTCEngine) createPublisherPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
358389
var err error
359390
if e.publisher, err = NewPCTransport(PCTransportParams{
360391
Configuration: configuration,
@@ -366,20 +397,7 @@ func (e *RTCEngine) configure(
366397
}); err != nil {
367398
return err
368399
}
369-
if e.subscriber, err = NewPCTransport(PCTransportParams{
370-
Configuration: configuration,
371-
RetransmitBufferSize: e.connParams.RetransmitBufferSize,
372-
}); err != nil {
373-
return err
374-
}
375400
e.publisher.SetLogger(e.log)
376-
e.subscriber.SetLogger(e.log)
377-
e.log.Debugw("Using ICE servers", "servers", iceServers)
378-
379-
if subscriberPrimary != nil {
380-
e.subscriberPrimary = *subscriberPrimary
381-
}
382-
e.subscriber.OnRemoteDescriptionSettled(e.createSubscriberPCAnswerAndSend)
383401

384402
e.publisher.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
385403
if candidate == nil {
@@ -401,61 +419,8 @@ func (e *RTCEngine) configure(
401419
}
402420
})
403421

404-
e.subscriber.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
405-
if candidate == nil {
406-
// done
407-
return
408-
}
409-
init := candidate.ToJSON()
410-
e.log.Debugw(
411-
"local ICE candidate",
412-
"target", livekit.SignalTarget_SUBSCRIBER,
413-
"candidate", init.Candidate,
414-
)
415-
if err := e.signalTransport.SendMessage(
416-
e.signalling.SignalICECandidate(
417-
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_SUBSCRIBER, false),
418-
),
419-
); err != nil {
420-
e.log.Errorw("could not send ICE candidates for subscriber", err)
421-
}
422-
})
423-
424-
primaryTransport := e.publisher
425-
if e.subscriberPrimary {
426-
primaryTransport = e.subscriber
427-
}
428-
primaryTransport.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
429-
switch state {
430-
case webrtc.ICEConnectionStateConnected:
431-
var fields []interface{}
432-
if pair, err := primaryTransport.GetSelectedCandidatePair(); err == nil {
433-
fields = append(fields, "iceCandidatePair", pair)
434-
}
435-
e.log.Debugw("ICE connected", fields...)
436-
case webrtc.ICEConnectionStateDisconnected:
437-
e.log.Debugw("ICE disconnected")
438-
case webrtc.ICEConnectionStateFailed:
439-
e.log.Debugw("ICE failed")
440-
e.handleDisconnect(false)
441-
}
442-
})
443-
444-
e.subscriber.pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
445-
e.engineHandler.OnMediaTrack(remote, receiver)
446-
})
447-
448-
e.subscriber.pc.OnDataChannel(func(c *webrtc.DataChannel) {
449-
e.dclock.Lock()
450-
defer e.dclock.Unlock()
451-
if c.Label() == reliableDataChannelName {
452-
e.reliableDCSub = c
453-
} else if c.Label() == lossyDataChannelName {
454-
e.lossyDCSub = c
455-
} else {
456-
return
457-
}
458-
c.OnMessage(e.handleDataPacket)
422+
e.publisher.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
423+
e.handleICEConnectionStateChange(e.publisher, livekit.SignalTarget_PUBLISHER, isPrimary, state)
459424
})
460425

461426
e.publisher.OnOffer = func(offer webrtc.SessionDescription) {
@@ -473,7 +438,7 @@ func (e *RTCEngine) configure(
473438
falseVal := false
474439
maxRetries := uint16(1)
475440
e.dclock.Lock()
476-
e.lossyDC, err = e.publisher.PeerConnection().CreateDataChannel(lossyDataChannelName, &webrtc.DataChannelInit{
441+
e.lossyDC, err = e.publisher.pc.CreateDataChannel(lossyDataChannelName, &webrtc.DataChannelInit{
477442
Ordered: &falseVal,
478443
MaxRetransmits: &maxRetries,
479444
})
@@ -483,7 +448,7 @@ func (e *RTCEngine) configure(
483448
}
484449
e.lossyDC.OnMessage(e.handleDataPacket)
485450

486-
e.reliableDC, err = e.publisher.PeerConnection().CreateDataChannel(reliableDataChannelName, &webrtc.DataChannelInit{
451+
e.reliableDC, err = e.publisher.pc.CreateDataChannel(reliableDataChannelName, &webrtc.DataChannelInit{
487452
Ordered: &trueVal,
488453
})
489454
if err != nil {
@@ -492,10 +457,11 @@ func (e *RTCEngine) configure(
492457
}
493458
e.reliableDC.OnMessage(e.handleDataPacket)
494459

495-
// SIGNALLING-V2-TODO: instantiating this rely on signal transport strategy rather than signalling version
460+
// SIGNALLING-V2-TODO: may need a separate peer connection
461+
// SIGNALLING-V2-TODO: instantiating this should rely on signal transport strategy rather than signalling version
496462
// SIGNALLING-V2-TODO: for signalling v2 instantiate publisher PC before connect and then do just SetConfiguration in OnConnectResponse
497463
if e.signallingVersion == signalling.SignallingVersionV2 {
498-
e.signallingDC, err = e.publisher.PeerConnection().CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
464+
e.signallingDC, err = e.publisher.pc.CreateDataChannel(signallingDataChannelName, &webrtc.DataChannelInit{
499465
Ordered: &trueVal,
500466
})
501467
if err != nil {
@@ -520,6 +486,100 @@ func (e *RTCEngine) configure(
520486
return nil
521487
}
522488

489+
func (e *RTCEngine) createSubscriberPCLocked(configuration webrtc.Configuration, isPrimary bool) error {
490+
var err error
491+
if e.subscriber, err = NewPCTransport(PCTransportParams{
492+
Configuration: configuration,
493+
RetransmitBufferSize: e.connParams.RetransmitBufferSize,
494+
}); err != nil {
495+
return err
496+
}
497+
e.subscriber.SetLogger(e.log)
498+
499+
e.subscriber.OnRemoteDescriptionSettled(e.createSubscriberPCAnswerAndSend)
500+
501+
e.subscriber.pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
502+
if candidate == nil {
503+
// done
504+
return
505+
}
506+
init := candidate.ToJSON()
507+
e.log.Debugw(
508+
"local ICE candidate",
509+
"target", livekit.SignalTarget_SUBSCRIBER,
510+
"candidate", init.Candidate,
511+
)
512+
if err := e.signalTransport.SendMessage(
513+
e.signalling.SignalICECandidate(
514+
protosignalling.ToProtoTrickle(init, livekit.SignalTarget_SUBSCRIBER, false),
515+
),
516+
); err != nil {
517+
e.log.Errorw("could not send ICE candidates for subscriber", err)
518+
}
519+
})
520+
521+
e.subscriber.pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
522+
e.handleICEConnectionStateChange(e.subscriber, livekit.SignalTarget_SUBSCRIBER, isPrimary, state)
523+
})
524+
525+
e.subscriber.pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
526+
e.engineHandler.OnMediaTrack(remote, receiver)
527+
})
528+
529+
e.subscriber.pc.OnDataChannel(func(c *webrtc.DataChannel) {
530+
e.dclock.Lock()
531+
defer e.dclock.Unlock()
532+
if c.Label() == reliableDataChannelName {
533+
e.reliableDCSub = c
534+
} else if c.Label() == lossyDataChannelName {
535+
e.lossyDCSub = c
536+
} else {
537+
return
538+
}
539+
c.OnMessage(e.handleDataPacket)
540+
})
541+
542+
return nil
543+
}
544+
545+
func (e *RTCEngine) handleICEConnectionStateChange(
546+
transport *PCTransport,
547+
signalTarget livekit.SignalTarget,
548+
isPrimary bool,
549+
state webrtc.ICEConnectionState,
550+
) {
551+
switch state {
552+
case webrtc.ICEConnectionStateConnected:
553+
var fields []interface{}
554+
if pair, err := transport.GetSelectedCandidatePair(); err == nil {
555+
fields = append(fields, "transport", signalTarget, "iceCandidatePair", pair)
556+
}
557+
e.log.Debugw("ICE connected", fields...)
558+
case webrtc.ICEConnectionStateDisconnected:
559+
e.log.Debugw("ICE disconnected", "transport", signalTarget)
560+
case webrtc.ICEConnectionStateFailed:
561+
e.log.Debugw("ICE failed", "transport", signalTarget)
562+
if isPrimary {
563+
e.handleDisconnect(false)
564+
}
565+
}
566+
}
567+
568+
func (e *RTCEngine) closePeerConnections() {
569+
e.pclock.Lock()
570+
defer e.pclock.Unlock()
571+
572+
if e.publisher != nil {
573+
e.publisher.Close()
574+
e.publisher = nil
575+
}
576+
577+
if e.subscriber != nil {
578+
e.subscriber.Close()
579+
e.subscriber = nil
580+
}
581+
}
582+
523583
func (e *RTCEngine) GetDataChannel(kind livekit.DataPacket_Kind) *webrtc.DataChannel {
524584
e.dclock.RLock()
525585
defer e.dclock.RUnlock()
@@ -787,6 +847,8 @@ func (e *RTCEngine) restartConnection() error {
787847
}
788848
e.signalTransport.Close()
789849

850+
e.closePeerConnections()
851+
790852
_, err := e.JoinContext(context.TODO(), e.url, e.token.Load(), e.connParams)
791853
return err
792854
}
@@ -1388,3 +1450,11 @@ func (e *RTCEngine) OnConnectResponse(res *livekit.ConnectResponse) error {
13881450
}
13891451
return nil
13901452
}
1453+
1454+
// ------------------------------------
1455+
1456+
func setConfiguration(pcTransport *PCTransport, configuration webrtc.Configuration) {
1457+
if pcTransport != nil {
1458+
pcTransport.SetConfiguration(configuration)
1459+
}
1460+
}

signalling/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ type SignalTransport interface {
160160
url string,
161161
token string,
162162
connectParams ConnectParams,
163+
publisherOffer webrtc.SessionDescription,
163164
) error
164165
Reconnect(
165166
url string,

signalling/signaltransport_http.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727

2828
"github.com/livekit/protocol/livekit"
2929
"github.com/livekit/protocol/logger"
30+
protosignalling "github.com/livekit/protocol/signalling"
31+
"github.com/pion/webrtc/v4"
3032
"google.golang.org/protobuf/proto"
3133
)
3234

@@ -86,8 +88,9 @@ func (s *signalTransportHttp) Join(
8688
url string,
8789
token string,
8890
connectParams ConnectParams,
91+
publisherOffer webrtc.SessionDescription,
8992
) error {
90-
msg, err := s.connect(ctx, url, token, connectParams, "")
93+
msg, err := s.connect(ctx, url, token, connectParams, publisherOffer, "")
9194
if err != nil {
9295
return err
9396
}
@@ -106,7 +109,14 @@ func (s *signalTransportHttp) Reconnect(
106109
participantSID string,
107110
) error {
108111
connectParams.Reconnect = true
109-
msg, err := s.connect(context.TODO(), url, token, connectParams, participantSID)
112+
msg, err := s.connect(
113+
context.TODO(),
114+
url,
115+
token,
116+
connectParams,
117+
webrtc.SessionDescription{},
118+
participantSID,
119+
)
110120
if err != nil {
111121
return err
112122
}
@@ -137,6 +147,7 @@ func (s *signalTransportHttp) connect(
137147
urlPrefix string,
138148
token string,
139149
connectParams ConnectParams,
150+
publisherOffer webrtc.SessionDescription,
140151
participantSID string,
141152
) (proto.Message, error) {
142153
if joinMethod := s.params.Signalling.JoinMethod(); joinMethod != joinMethodConnectRequest {
@@ -157,13 +168,17 @@ func (s *signalTransportHttp) connect(
157168
if err != nil {
158169
return nil, err
159170
}
171+
s.params.Signalling.SignalConnectRequest(connectRequest)
172+
173+
if publisherOffer.SDP != "" {
174+
s.params.Signalling.SignalSdpOffer(protosignalling.ToProtoSessionDescription(publisherOffer, 0))
175+
}
160176

161-
wireMessage := s.params.Signalling.SignalConnectRequest(connectRequest)
162177
return s.sendHttpRequest(
163178
urlPrefix+s.params.Signalling.Path(),
164179
http.MethodPost,
165180
token,
166-
wireMessage,
181+
s.params.Signalling.PendingMessages(),
167182
)
168183
}
169184

signalling/signaltransport_hybrid.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"sync"
2020

2121
"github.com/livekit/protocol/logger"
22+
"github.com/pion/webrtc/v4"
2223
"google.golang.org/protobuf/proto"
2324
)
2425

@@ -104,8 +105,9 @@ func (s *signalTransportHybrid) Join(
104105
url string,
105106
token string,
106107
connectParams ConnectParams,
108+
publisherOffer webrtc.SessionDescription,
107109
) error {
108-
return s.syncTransport.Join(ctx, url, token, connectParams)
110+
return s.syncTransport.Join(ctx, url, token, connectParams, publisherOffer)
109111
}
110112

111113
func (s *signalTransportHybrid) Reconnect(

0 commit comments

Comments
 (0)