Skip to content

Commit

Permalink
Only write session end log message if session exists (#2777)
Browse files Browse the repository at this point in the history
* Only write session end log if session exists

* pending changelog
  • Loading branch information
mjh1 authored Mar 24, 2023
1 parent 8c7ec01 commit db28a19
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
#### Transcoder

### Bug Fixes 🐞
- \# 2759 Parse keystore address without 0x prefix, fix parse error logging
- \# 2764 Call session end asynchronously to avoid unnecessary blocking
- \#2759 Parse keystore address without 0x prefix, fix parse error logging
- \#2764 Call session end asynchronously to avoid unnecessary blocking (@mjh1)
- \#2777 Only write session end log message if session exists (@mjh1)

#### CLI

Expand Down
18 changes: 12 additions & 6 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,8 +660,13 @@ func (n *LivepeerNode) transcodeSegmentLoop(logCtx context.Context, md *SegTrans

func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Context) {
// timeout; clean up goroutine here
var (
exists bool
storage *transcodeConfig
sess *RemoteTranscoder
)
n.storageMutex.Lock()
if storage, exists := n.StorageConfigs[sessionId]; exists {
if storage, exists = n.StorageConfigs[sessionId]; exists {
storage.OS.EndSession()
storage.LocalOS.EndSession()
delete(n.StorageConfigs, sessionId)
Expand All @@ -671,7 +676,7 @@ func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Co
if n.TranscoderManager != nil {
n.TranscoderManager.RTmutex.Lock()
// send empty segment to signal transcoder internal session teardown if session exist
if sess, exists := n.TranscoderManager.streamSessions[sessionId]; exists {
if sess, exists = n.TranscoderManager.streamSessions[sessionId]; exists {
segData := &net.SegData{
AuthToken: &net.AuthToken{SessionId: sessionId},
}
Expand All @@ -685,14 +690,17 @@ func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Co
}
n.segmentMutex.Lock()
mid := ManifestID(sessionId)
if _, ok := n.SegmentChans[mid]; ok {
if _, exists = n.SegmentChans[mid]; exists {
close(n.SegmentChans[mid])
delete(n.SegmentChans, mid)
if lpmon.Enabled {
lpmon.CurrentSessions(len(n.SegmentChans))
}
}
n.segmentMutex.Unlock()
if exists {
clog.V(common.DEBUG).Infof(logCtx, "Transcoding session ended by the Broadcaster for sessionID=%v", sessionId)
}
}

func (n *LivepeerNode) serveTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities) {
Expand Down Expand Up @@ -980,9 +988,7 @@ func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string, caps *Cap

// ends transcoding session and releases resources
func (node *LivepeerNode) EndTranscodingSession(sessionId string) {
logCtx := context.TODO()
node.endTranscodingSession(sessionId, logCtx)
clog.V(common.DEBUG).Infof(logCtx, "Transcoding session ended by the Broadcaster for sessionID=%v", sessionId)
node.endTranscodingSession(sessionId, context.TODO())
}

func (node *RemoteTranscoderManager) EndTranscodingSession(sessionId string) {
Expand Down

0 comments on commit db28a19

Please sign in to comment.