Skip to content

Commit

Permalink
Fix EndTranscodingSession() call and potential race (livepeer#2735)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberj0g authored and eliteprox committed Feb 3, 2023
1 parent 35c62cc commit f1373af
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
7 changes: 4 additions & 3 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (lb *LoadBalancingTranscoder) EndTranscodingSession(sessionId string) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if session, exists := lb.sessions[sessionId]; exists {
// delete session id here to avoid the race
delete(lb.sessions, sessionId)
// signal transcode loop finish for this session
close(session.stop)
clog.V(common.DEBUG).Infof(context.TODO(), "LB: Transcode session id=%s teared down", session.key)
Expand Down Expand Up @@ -130,10 +132,9 @@ func (lb *LoadBalancingTranscoder) createSession(ctx context.Context, md *SegTra
lb.mu.Lock()
defer lb.mu.Unlock()
_, exists := lb.sessions[job]
if !exists {
return
if exists {
delete(lb.sessions, job)
}
delete(lb.sessions, job)
lb.load[transcoder] -= costEstimate
clog.V(common.DEBUG).Infof(ctx, "LB: Deleted transcode session for key=%s", session.key)
}
Expand Down
2 changes: 1 addition & 1 deletion core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -994,8 +994,8 @@ func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string, caps *Cap
// ends transcoding session and releases resources
func (node *LivepeerNode) EndTranscodingSession(sessionId string) {
logCtx := context.TODO()
clog.V(common.DEBUG).Infof(logCtx, "Transcoding session ended by the Broadcaster for sessionID=%v", sessionId)
node.endTranscodingSession(sessionId, logCtx)
clog.V(common.DEBUG).Infof(logCtx, "Transcoding session ended by the Broadcaster for sessionID=%v", sessionId)
}

func (node *RemoteTranscoderManager) EndTranscodingSession(sessionId string) {
Expand Down
2 changes: 1 addition & 1 deletion server/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func EndTranscodingSession(ctx context.Context, sess *BroadcastSession) error {
defer conn.Close()

req, err := genEndSessionRequest(sess)
_, err = c.EndTranscodingSession(ctx, req)
_, err = c.EndTranscodingSession(context.Background(), req)
if err != nil {
return errors.Wrapf(err, "Could not end orchestrator session orch=%v", sess.Transcoder())
}
Expand Down

0 comments on commit f1373af

Please sign in to comment.