Skip to content

Commit d9aac9a

Browse files
move LivePipeline StreamCtx to not exported and add function to get Context
1 parent 17357af commit d9aac9a

File tree

2 files changed

+11
-6
lines changed

2 files changed

+11
-6
lines changed

core/livepeernode.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ type LivePipeline struct {
191191

192192
DataWriter *media.SegmentWriter
193193

194-
StreamCtx context.Context
194+
streamCtx context.Context
195195
streamCancel context.CancelCauseFunc
196196
streamParams interface{}
197197
streamRequest []byte
@@ -202,7 +202,8 @@ func (n *LivepeerNode) NewLivePipeline(requestID, streamID, pipeline string, str
202202
n.LiveMu.Lock()
203203
defer n.LiveMu.Unlock()
204204

205-
//ensure streamRequest is not nil or empty
205+
//ensure streamRequest is not nil or empty to avoid json unmarshal issues on Orchestrator failover
206+
//sends the request bytes to next Orchestrator
206207
if streamRequest == nil || len(streamRequest) == 0 {
207208
streamRequest = []byte("{}")
208209
}
@@ -211,7 +212,7 @@ func (n *LivepeerNode) NewLivePipeline(requestID, streamID, pipeline string, str
211212
RequestID: requestID,
212213
StreamID: streamID,
213214
Pipeline: pipeline,
214-
StreamCtx: streamCtx,
215+
streamCtx: streamCtx,
215216
streamParams: streamParams,
216217
streamCancel: streamCancel,
217218
streamRequest: streamRequest,
@@ -226,6 +227,10 @@ func (n *LivepeerNode) RemoveLivePipeline(streamID string) {
226227
delete(n.LivePipelines, streamID)
227228
}
228229

230+
func (n *LivePipeline) GetContext() context.Context {
231+
return n.streamCtx
232+
}
233+
229234
func (p *LivePipeline) StreamParams() interface{} {
230235
return p.streamParams
231236
}

server/job_stream.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (ls *LivepeerServer) runStream(gatewayJob *gatewayJob) {
152152
return
153153
}
154154
//this context passes to all channels that will close when stream is canceled
155-
ctx := stream.StreamCtx
155+
ctx := stream.GetContext()
156156
ctx = clog.AddVal(ctx, "stream_id", streamID)
157157

158158
params, err := getStreamRequestParams(stream)
@@ -290,10 +290,10 @@ func (ls *LivepeerServer) monitorStream(streamId string) {
290290

291291
//ensure live pipeline is cleaned up if monitoring ends
292292
defer ls.LivepeerNode.RemoveLivePipeline(streamId)
293-
293+
streamCtx := stream.GetContext()
294294
for {
295295
select {
296-
case <-stream.StreamCtx.Done():
296+
case <-streamCtx.Done():
297297
clog.Infof(ctx, "Stream %s stopped, ending monitoring", streamId)
298298
return
299299
case <-pmtTicker.C:

0 commit comments

Comments
 (0)