From d0b23de7597bdf40c21de9d6e38efcf64208f01d Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Tue, 17 Aug 2021 08:35:56 +0000 Subject: [PATCH 1/3] server: Eliminate data races --- server/mediaserver.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/server/mediaserver.go b/server/mediaserver.go index 848b45a583..c950d17165 100644 --- a/server/mediaserver.go +++ b/server/mediaserver.go @@ -779,10 +779,12 @@ func (s *LivepeerServer) HandlePush(w http.ResponseWriter, r *http.Request) { mid = intmid } cxn, exists := s.rtmpConnections[mid] + s.connectionLock.RUnlock() if exists && cxn != nil { + s.connectionLock.Lock() cxn.lastUsed = now + s.connectionLock.Unlock() } - s.connectionLock.RUnlock() // Check for presence and register if a fresh cxn if !exists { @@ -1118,7 +1120,7 @@ func (s *LivepeerServer) streamMP4(w http.ResponseWriter, r *http.Request, jpl * ow.Close() <-done glog.Infof("Completed mp4 request=%s manifestID=%s sourceBytes=%d destBytes=%d", r.URL.String(), - manifestID, sourceBytesSent, resultBytesSent) + manifestID, atomic.LoadInt64(&sourceBytesSent), resultBytesSent) }() oname := fmt.Sprintf("pipe:%d", ow.Fd()) out := []ffmpeg.TranscodeOptions{ @@ -1152,25 +1154,25 @@ func (s *LivepeerServer) streamMP4(w http.ResponseWriter, r *http.Request, jpl * fname := fmt.Sprintf("pipe:%d", ir.Fd()) in := &ffmpeg.TranscodeOptionsIn{Fname: fname, Transmuxing: true} - go func() { + go func(segUri string, iw *os.File) { defer iw.Close() - glog.V(common.VERBOSE).Infof("Adding manifestID=%s track=%s uri=%s to mp4", manifestID, track, seg.URI) - resp, err := http.Get(seg.URI) + glog.V(common.VERBOSE).Infof("Adding manifestID=%s track=%s uri=%s to mp4", manifestID, track, segUri) + resp, err := http.Get(segUri) if err != nil { - glog.Errorf("Error getting HTTP uri=%s manifestID=%s err=%v", seg.URI, manifestID, err) + glog.Errorf("Error getting HTTP uri=%s manifestID=%s err=%v", segUri, manifestID, err) return } defer resp.Body.Close() if resp.StatusCode != 200 { - glog.Errorf("Non-200 response for status=%v uri=%s manifestID=%s request=%s", resp.Status, seg.URI, manifestID, r.URL.String()) + glog.Errorf("Non-200 response for status=%v uri=%s manifestID=%s request=%s", resp.Status, segUri, manifestID, r.URL.String()) return } wn, err := io.Copy(iw, resp.Body) if err != nil { - glog.Errorf("Error transmuxing to mp4 request=%s uri=%s manifestID=%s err=%v", r.URL.String(), seg.URI, manifestID, err) + glog.Errorf("Error transmuxing to mp4 request=%s uri=%s manifestID=%s err=%v", r.URL.String(), segUri, manifestID, err) } - sourceBytesSent += wn - }() + atomic.AddInt64(&sourceBytesSent, wn) + }(seg.URI, iw) _, err = tc.Transcode(in, out) ir.Close() From 811048d5ee250da84ebb9319dff5637c1066aae8 Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Wed, 18 Aug 2021 16:59:24 +0000 Subject: [PATCH 2/3] .gitignore: add livepeer_bench to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 40d6b85204..3f288f4450 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ eth/protocol /livepeer /livepeer_cli /livepeer_router +/livepeer_bench /.git.describe /livepeer-windows-amd64.zip From a14a401deafad71aa434d7333d9b3be4557ec7d6 Mon Sep 17 00:00:00 2001 From: Ivan Tivonenko Date: Wed, 18 Aug 2021 17:01:36 +0000 Subject: [PATCH 3/3] CHANGELOG_PENDING: Eliminate data races in mediaserver.go --- CHANGELOG_PENDING.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 4b1d053b72..81bbe7cc2b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -17,6 +17,8 @@ ### Bug Fixes 🐞 +- \#1992 Eliminate data races in mediaserver.go (@darkdarkdragon) + #### General #### Broadcaster