Skip to content

Commit b6e42bc

Browse files
authored
ai/live: Store and forward parameter updates. (#3682)
This allows us to correctly handle cases where: * We haven't selected an orchestrator yet but the client sent a control API update. * We've swapped orchestrators. * We're mid-swap without an orchestrator. In all these cases, the newly selected orchestrator will receive the most recent parameter update as soon as it's selected. Also: * Add a 1 MB limit for reading parameters. * Unblock processStream if selecting the first orchestrator fails.
1 parent 7ce6ca5 commit b6e42bc

File tree

3 files changed

+73
-14
lines changed

3 files changed

+73
-14
lines changed

core/livepeernode.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ type LivepeerNode struct {
170170

171171
type LivePipeline struct {
172172
RequestID string
173+
Params string
173174
ControlPub *trickle.TricklePublisher
174175
StopControl func()
175176
}

server/ai_live_video.go

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,24 @@ func copySegment(ctx context.Context, segment *http.Response, w io.Writer, seq i
452452
}
453453
}
454454

455+
func registerControl(ctx context.Context, params aiRequestParams) {
456+
params.node.LiveMu.Lock()
457+
defer params.node.LiveMu.Unlock()
458+
459+
stream := params.liveParams.stream
460+
if sess, exists := params.node.LivePipelines[stream]; exists && sess.ControlPub != nil {
461+
if sess.ControlPub != nil {
462+
clog.Info(ctx, "Stopping existing control loop", "existing_request_id", sess.RequestID)
463+
sess.ControlPub.Close()
464+
// TODO better solution than allowing existing streams to stomp over one another
465+
}
466+
}
467+
468+
params.node.LivePipelines[stream] = &core.LivePipeline{
469+
RequestID: params.liveParams.requestID,
470+
}
471+
}
472+
455473
func startControlPublish(ctx context.Context, control *url.URL, params aiRequestParams) {
456474
stream := params.liveParams.stream
457475
controlPub, err := trickle.NewTricklePublisher(control.String())
@@ -472,22 +490,40 @@ func startControlPublish(ctx context.Context, control *url.URL, params aiRequest
472490
})
473491
}
474492

475-
if control, exists := params.node.LivePipelines[stream]; exists {
476-
clog.Info(ctx, "Stopping existing control loop", "existing_request_id", control.RequestID)
477-
control.ControlPub.Close()
478-
// TODO better solution than allowing existing streams to stomp over one another
493+
sess, exists := params.node.LivePipelines[stream]
494+
if !exists || sess.RequestID != params.liveParams.requestID {
495+
stopProcessing(ctx, params, fmt.Errorf("control session did not exist"))
496+
return
479497
}
480-
481-
params.node.LivePipelines[stream] = &core.LivePipeline{
482-
ControlPub: controlPub,
483-
StopControl: stop,
484-
RequestID: params.liveParams.requestID,
498+
if sess.ControlPub != nil {
499+
// clean up from existing orchestrator
500+
go sess.ControlPub.Close()
485501
}
502+
sess.ControlPub = controlPub
503+
sess.StopControl = stop
504+
486505
if monitor.Enabled {
487506
monitor.AICurrentLiveSessions(len(params.node.LivePipelines))
488507
logCurrentLiveSessions(params.node.LivePipelines)
489508
}
490509

510+
// Send any cached control params in a goroutine outside the lock.
511+
msg := sess.Params
512+
go func() {
513+
if msg == "" {
514+
return
515+
}
516+
var err error
517+
for i := 0; i < 3; i++ {
518+
err = controlPub.Write(strings.NewReader(msg))
519+
if err == nil {
520+
return
521+
}
522+
time.Sleep(100 * time.Millisecond)
523+
}
524+
stopProcessing(ctx, params, fmt.Errorf("control write failed: %w", err))
525+
}()
526+
491527
// send a keepalive periodically to keep both ends of the connection alive
492528
go func() {
493529
for {

server/ai_mediaserver.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,8 @@ func (ls *LivepeerServer) StartLiveVideo() http.Handler {
633633
},
634634
}
635635

636+
registerControl(ctx, params)
637+
636638
// Create a special parent context for orchestrator cancellation
637639
orchCtx, orchCancel := context.WithCancel(ctx)
638640

@@ -721,6 +723,10 @@ func processStream(ctx context.Context, params aiRequestParams, req worker.GenLi
721723
}
722724
params.liveParams.sendErrorEvent(err)
723725
}
726+
if isFirst {
727+
// failed before selecting an orchestrator
728+
firstProcessed <- struct{}{}
729+
}
724730
params.liveParams.kickInput(err)
725731
}()
726732
<-firstProcessed
@@ -820,22 +826,36 @@ func (ls *LivepeerServer) UpdateLiveVideo() http.Handler {
820826
return
821827
}
822828
ls.LivepeerNode.LiveMu.RLock()
823-
defer ls.LivepeerNode.LiveMu.RUnlock()
829+
// NB: LiveMu is a global lock, avoid holding it
830+
// during blocking network actions ... can't defer
824831
p, ok := ls.LivepeerNode.LivePipelines[stream]
832+
ls.LivepeerNode.LiveMu.RUnlock()
825833
if !ok {
826834
// Stream not found
827835
http.Error(w, "Stream not found", http.StatusNotFound)
828836
return
829837
}
830-
defer r.Body.Close()
831-
params, err := io.ReadAll(r.Body)
838+
reader := http.MaxBytesReader(w, r.Body, 1*1024*104) // 1 MB
839+
defer reader.Close()
840+
data, err := io.ReadAll(r.Body)
832841
if err != nil {
833842
http.Error(w, err.Error(), http.StatusBadRequest)
834843
return
835844
}
836845

837-
clog.V(6).Infof(ctx, "Sending Live Video Update Control API stream=%s, params=%s", stream, string(params))
838-
if err := p.ControlPub.Write(strings.NewReader(string(params))); err != nil {
846+
params := string(data)
847+
ls.LivepeerNode.LiveMu.Lock()
848+
p.Params = params
849+
controlPub := p.ControlPub
850+
ls.LivepeerNode.LiveMu.Unlock()
851+
852+
if controlPub == nil {
853+
clog.Info(ctx, "No orchestrator available, caching params", "stream", stream, "params", params)
854+
return
855+
}
856+
857+
clog.V(6).Infof(ctx, "Sending Live Video Update Control API stream=%s, params=%s", stream, params)
858+
if err := controlPub.Write(strings.NewReader(params)); err != nil {
839859
http.Error(w, err.Error(), http.StatusInternalServerError)
840860
return
841861
}
@@ -1069,6 +1089,8 @@ func (ls *LivepeerServer) CreateWhip(server *media.WHIPServer) http.Handler {
10691089
},
10701090
}
10711091

1092+
registerControl(ctx, params)
1093+
10721094
req := worker.GenLiveVideoToVideoJSONRequestBody{
10731095
ModelId: &pipeline,
10741096
Params: &pipelineParams,

0 commit comments

Comments
 (0)