Skip to content

Commit

Permalink
core: Return from transcode loop if seg chan closed
Browse files Browse the repository at this point in the history
  • Loading branch information
yondonfu committed Sep 16, 2022
1 parent 3ff79bd commit 407fa45
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 4 deletions.
34 changes: 34 additions & 0 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"fmt"
"io/ioutil"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -154,6 +155,39 @@ func TestTranscodeLoop_GivenNoSegmentsPastTimeout_CleansSegmentChan(t *testing.T
assert.Nil(segChan)
}

func TestTranscodeLoop_CleanupForBroadcasterEndTranscodingSession(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

drivers.NodeStorage = drivers.NewMemoryDriver(nil)
oldTranscodeLoopTimeout := transcodeLoopTimeout
defer func() { transcodeLoopTimeout = oldTranscodeLoopTimeout }()
transcodeLoopTimeout = 100 * time.Millisecond

tmp := t.TempDir()

ffmpeg.InitFFmpeg()
n, _ := NewLivepeerNode(&eth.StubClient{}, tmp, nil)
n.Transcoder = NewLocalTranscoder(tmp)

md := &SegTranscodingMetadata{Profiles: videoProfiles, AuthToken: stubAuthToken()}
mid := ManifestID(md.AuthToken.SessionId)

ss := StubSegment()
_, err := n.sendToTranscodeLoop(context.TODO(), md, ss)
require.Nil(err)
require.NotNil(getSegChan(n, mid))

startRoutines := runtime.NumGoroutine()

n.endTranscodingSession(md.AuthToken.SessionId, context.TODO())
waitForTranscoderLoopTimeout(n, mid)

endRoutines := runtime.NumGoroutine()

assert.Equal(endRoutines, startRoutines-1)
}

func waitForTranscoderLoopTimeout(n *LivepeerNode, m ManifestID) {
for i := 0; i < 3; i++ {
time.Sleep(transcodeLoopTimeout * 2)
Expand Down
10 changes: 6 additions & 4 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,13 @@ func (n *LivepeerNode) transcodeSegmentLoop(logCtx context.Context, md *SegTrans
clog.V(common.DEBUG).Infof(logCtx, "Segment loop timed out; closing ")
n.endTranscodingSession(md.AuthToken.SessionId, logCtx)
return
case chanData := <-segChan:
// nil means channel is closed by endTranscodingSession called by B
if chanData != nil {
chanData.res <- n.transcodeSeg(chanData.ctx, *n.StorageConfig, chanData.seg, chanData.md)
case chanData, ok := <-segChan:
// Check if channel was closed due to endTranscodingSession being called by B
if !ok {
cancel()
return
}
chanData.res <- n.transcodeSeg(chanData.ctx, *n.StorageConfig, chanData.seg, chanData.md)
}
cancel()
}
Expand Down

0 comments on commit 407fa45

Please sign in to comment.