Skip to content

Commit

Permalink
Broadcaster: fix nil pointer panic (#2586), fix race conditions in RT…
Browse files Browse the repository at this point in the history
…MP connection management

* Don't pass a nil context into grpc call or it panics

* Update changelog

* refactor connection initialization to remove duplicate checks in registerConnection

* refactor connection initialization to remove duplicate checks in registerConnection

* refactor connection initialization to remove duplicate checks in registerConnection

* Add lock for LivepeerServer fields

* go fmt

* address review comments

* Update CHANGELOG_PENDING.md

Co-authored-by: Ivan Poleshchuk <[email protected]>
  • Loading branch information
thomshutt and cyberj0g authored Sep 16, 2022
1 parent 3ff79bd commit 5d4e145
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#### Transcoder

### Bug Fixes 🐞
- [#2586](https://github.com/livepeer/go-livepeer/pull/2586) Broadcaster: Don't pass a nil context into grpc call or it panics

#### CLI

Expand All @@ -23,6 +24,7 @@

#### Broadcaster
- [#2573](https://github.com/livepeer/go-livepeer/pull/2573) server: Fix timeout for stream recording background jobs (@victorges)
- [#2586](https://github.com/livepeer/go-livepeer/pull/2586) Refactor RTMP connection object management to prevent race conditions (@cyberj0g)

#### Orchestrator

Expand Down
93 changes: 57 additions & 36 deletions server/mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ var httpPushResetTimer = func() (context.Context, context.CancelFunc) {
}

type rtmpConnection struct {
initializing chan struct{}
mid core.ManifestID
nonce uint64
stream stream.RTMPVideoStream
Expand All @@ -90,6 +91,16 @@ type rtmpConnection struct {
transcodedBytes uint64
}

func (s *LivepeerServer) getActiveRtmpConnectionUnsafe(mid core.ManifestID) (*rtmpConnection, bool) {
cxn, exists := s.rtmpConnections[mid]
if exists {
if cxn.initializing != nil {
<-cxn.initializing
}
}
return cxn, exists
}

type LivepeerServer struct {
RTMPSegmenter lpmscore.RTMPSegmenter
LPMS *lpmscore.LPMS
Expand All @@ -106,6 +117,7 @@ type LivepeerServer struct {
lastManifestID core.ManifestID
context context.Context
connectionLock *sync.RWMutex
serverLock *sync.RWMutex
}

func (s *LivepeerServer) SetContextFromUnitTest(c context.Context) {
Expand Down Expand Up @@ -172,6 +184,7 @@ func NewLivepeerServer(rtmpAddr string, lpNode *core.LivepeerNode, httpIngest bo
}
server := lpmscore.New(&opts)
ls := &LivepeerServer{RTMPSegmenter: server, LPMS: server, LivepeerNode: lpNode, HTTPMux: opts.HttpMux, connectionLock: &sync.RWMutex{},
serverLock: &sync.RWMutex{},
rtmpConnections: make(map[core.ManifestID]*rtmpConnection),
internalManifests: make(map[core.ManifestID]core.ManifestID),
recordingsAuthResponses: cache.New(time.Hour, 2*time.Hour),
Expand Down Expand Up @@ -515,50 +528,57 @@ func (s *LivepeerServer) registerConnection(ctx context.Context, rtmpStrm stream
Format: params.Format,
}
hlsStrmID := core.MakeStreamID(mid, &vProfile)
s.connectionLock.RLock()
// Fast path - check early if session exists - creating new session can take time
oldCxn, exists := s.rtmpConnections[mid]
s.connectionLock.RUnlock()
playlist := core.NewBasicPlaylistManager(mid, storage, recordStorage)

// first, initialize connection without SessionManager, which creates O and T sessions, and may leave
// connectionLock locked for significant amount of time
cxn := &rtmpConnection{
mid: mid,
initializing: make(chan struct{}),
nonce: params.Nonce,
stream: rtmpStrm,
pl: playlist,
profile: &vProfile,
params: params,
lastUsed: time.Now(),
}
s.connectionLock.Lock()
oldCxn, exists := s.getActiveRtmpConnectionUnsafe(mid)
if exists {
// We can only have one concurrent stream per ManifestID
s.connectionLock.Unlock()
return oldCxn, errAlreadyExists
}
s.rtmpConnections[mid] = cxn
// do not obtain this lock again while initializing channel is open, it will cause deadlock if other goroutine already obtained the lock and called getActiveRtmpConnectionUnsafe()
s.connectionLock.Unlock()

playlist := core.NewBasicPlaylistManager(mid, storage, recordStorage)
// initialize session manager
var stakeRdr stakeReader
if s.LivepeerNode.Eth != nil {
stakeRdr = &storeStakeReader{store: s.LivepeerNode.Database}
}
selFactory := func() BroadcastSessionsSelector {
return NewMinLSSelectorWithRandFreq(stakeRdr, 1.0, SelectRandFreq)
}
cxn := &rtmpConnection{
mid: mid,
nonce: params.Nonce,
stream: rtmpStrm,
pl: playlist,
profile: &vProfile,
params: params,
sessManager: NewSessionManager(ctx, s.LivepeerNode, params, selFactory),
lastUsed: time.Now(),
}

s.connectionLock.Lock()
oldCxn, exists = s.rtmpConnections[mid]
// Check if session exist again - potentially two sessions can be created simultaneously,
// so we don't want to overwrite one that was already created
if exists {
// We can only have one concurrent stream per ManifestID
s.connectionLock.Unlock()
cxn.sessManager.cleanup(nil)
return oldCxn, errAlreadyExists
}
s.rtmpConnections[mid] = cxn
// safe, because other goroutines should be waiting on initializing channel
cxn.sessManager = NewSessionManager(ctx, s.LivepeerNode, params, selFactory)

// populate fields and signal initializing channel
s.serverLock.Lock()
s.lastManifestID = mid
s.lastHLSStreamID = hlsStrmID
s.serverLock.Unlock()

// connection is ready, only monitoring below
close(cxn.initializing)

// need lock to access rtmpConnections
s.connectionLock.RLock()
defer s.connectionLock.RUnlock()
sessionsNumber := len(s.rtmpConnections)
fastVerificationEnabled, fastVerificationUsing := countStreamsWithFastVerificationEnabled(s.rtmpConnections)
s.connectionLock.Unlock()

if monitor.Enabled {
monitor.CurrentSessions(sessionsNumber)
Expand Down Expand Up @@ -590,7 +610,7 @@ func removeRTMPStream(ctx context.Context, s *LivepeerServer, extmid core.Manife
// to index into rtmpConnections
intmid = _intmid
}
cxn, ok := s.rtmpConnections[intmid]
cxn, ok := s.getActiveRtmpConnectionUnsafe(intmid)
if !ok || cxn.pl == nil {
clog.Warningf(ctx, "Attempted to end unknown stream with manifestID=%s", extmid)
return errUnknownStream
Expand Down Expand Up @@ -630,7 +650,7 @@ func getHLSMasterPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Ma

s.connectionLock.RLock()
defer s.connectionLock.RUnlock()
cxn, ok := s.rtmpConnections[manifestID]
cxn, ok := s.getActiveRtmpConnectionUnsafe(manifestID)
if !ok || cxn.pl == nil {
return nil, vidplayer.ErrNotFound
}
Expand All @@ -649,7 +669,7 @@ func getHLSMediaPlaylistHandler(s *LivepeerServer) func(url *url.URL) (*m3u8.Med
mid := strmID.ManifestID
s.connectionLock.RLock()
defer s.connectionLock.RUnlock()
cxn, ok := s.rtmpConnections[mid]
cxn, ok := s.getActiveRtmpConnectionUnsafe(mid)
if !ok || cxn.pl == nil {
return nil, vidplayer.ErrNotFound
}
Expand Down Expand Up @@ -701,7 +721,7 @@ func getRTMPStreamHandler(s *LivepeerServer) func(url *url.URL) (stream.RTMPVide
return func(url *url.URL) (stream.RTMPVideoStream, error) {
mid := parseManifestID(url.Path)
s.connectionLock.RLock()
cxn, ok := s.rtmpConnections[mid]
cxn, ok := s.getActiveRtmpConnectionUnsafe(mid)
defer s.connectionLock.RUnlock()
if !ok {
glog.Error("Cannot find RTMP stream for ManifestID ", mid)
Expand Down Expand Up @@ -803,7 +823,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
if intmid, exists := s.internalManifests[mid]; exists {
mid = intmid
}
cxn, exists := s.rtmpConnections[mid]
cxn, exists := s.getActiveRtmpConnectionUnsafe(mid)
if monitor.Enabled {
fastVerificationEnabled, fastVerificationUsing := countStreamsWithFastVerificationEnabled(s.rtmpConnections)
monitor.FastVerificationEnabledAndUsingCurrentSessions(fastVerificationEnabled, fastVerificationUsing)
Expand Down Expand Up @@ -847,7 +867,8 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
params.Resolution = r.Header.Get("Content-Resolution")
params.Format = format
s.connectionLock.RLock()
if mid != params.ManifestID && s.rtmpConnections[params.ManifestID] != nil && s.internalManifests[mid] == "" {
_, cxnExists := s.getActiveRtmpConnectionUnsafe(params.ManifestID)
if mid != params.ManifestID && cxnExists && s.internalManifests[mid] == "" {
// Pre-existing connection found for this new stream with the same underlying manifestID
var oldStreamID core.ManifestID
for k, v := range s.internalManifests {
Expand Down Expand Up @@ -890,7 +911,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
runCheck := func() BreakOperation {
var lastUsed time.Time
s.connectionLock.RLock()
if cxn, exists := s.rtmpConnections[intmid]; exists {
if cxn, exists := s.getActiveRtmpConnectionUnsafe(intmid); exists {
lastUsed = cxn.lastUsed
}
if _, exists := s.internalManifests[extmid]; !exists && intmid != extmid {
Expand Down Expand Up @@ -975,7 +996,7 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) {
case <-tick.Done():
clog.V(common.VERBOSE).Infof(ctx, "watchdog reset seq=%d dur=%v started=%v", seq, duration, now)
s.connectionLock.RLock()
if cxn, exists := s.rtmpConnections[mid]; exists {
if cxn, exists := s.getActiveRtmpConnectionUnsafe(mid); exists {
cxn.lastUsed = time.Now()
}
s.connectionLock.RUnlock()
Expand Down Expand Up @@ -1612,7 +1633,7 @@ func (s *LivepeerServer) GetNodeStatus() *common.NodeStatus {
func (s *LivepeerServer) LatestPlaylist() core.PlaylistManager {
s.connectionLock.RLock()
defer s.connectionLock.RUnlock()
cxn, ok := s.rtmpConnections[s.lastManifestID]
cxn, ok := s.getActiveRtmpConnectionUnsafe(s.lastManifestID)
if !ok || cxn.pl == nil {
return nil
}
Expand Down

0 comments on commit 5d4e145

Please sign in to comment.