From e12aacfd1a440bd0d920055f4d845f55098d09cd Mon Sep 17 00:00:00 2001 From: streamer45 Date: Fri, 17 Nov 2023 15:02:50 -0600 Subject: [PATCH 1/2] Fix race condition on stop --- service/rtc/server_test.go | 2 +- service/rtc/sfu.go | 14 +++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/service/rtc/server_test.go b/service/rtc/server_test.go index 75e501a..65d710b 100644 --- a/service/rtc/server_test.go +++ b/service/rtc/server_test.go @@ -461,7 +461,7 @@ func TestCalls(t *testing.T) { func TestTCPCandidates(t *testing.T) { log, err := logger.New(logger.Config{ EnableConsole: true, - ConsoleLevel: "INFO", + ConsoleLevel: "DEBUG", }) require.NoError(t, err) defer func() { diff --git a/service/rtc/sfu.go b/service/rtc/sfu.go index 10016ee..4f01e65 100644 --- a/service/rtc/sfu.go +++ b/service/rtc/sfu.go @@ -244,6 +244,8 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { us.initBWEstimator(<-bwEstimatorCh) peerConn.OnICECandidate(func(candidate *webrtc.ICECandidate) { + us.mut.RLock() + defer us.mut.RUnlock() if candidate == nil { return } @@ -252,6 +254,14 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { s.log.Error("failed to create ICE message", mlog.Err(err), mlog.String("sessionID", cfg.SessionID)) return } + + select { + case <-us.closeCh: + s.log.Debug("closeCh closed during ICE gathering", mlog.Any("sessionCfg", us.cfg)) + return + default: + } + select { case s.receiveCh <- msg: default: @@ -629,8 +639,10 @@ func (s *Server) CloseSession(sessionID string) error { } call.mut.Unlock() - us.rtcConn.Close() + us.mut.Lock() close(us.closeCh) + us.mut.Unlock() + us.rtcConn.Close() if us.closeCb != nil { return us.closeCb() From 876fb8ead7d6f26e3fa593719c3487699cf30fd6 Mon Sep 17 00:00:00 2001 From: streamer45 Date: Fri, 17 Nov 2023 15:24:09 -0600 Subject: [PATCH 2/2] Wait for signaling goroutines to exit before returning on close --- service/rtc/session.go | 1 + service/rtc/sfu.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/service/rtc/session.go b/service/rtc/session.go index d5490b2..a1492b8 100644 --- a/service/rtc/session.go +++ b/service/rtc/session.go @@ -51,6 +51,7 @@ type session struct { closeCh chan struct{} closeCb func() error + doneWg sync.WaitGroup vadMonitor *vad.Monitor diff --git a/service/rtc/sfu.go b/service/rtc/sfu.go index 4f01e65..e94420c 100644 --- a/service/rtc/sfu.go +++ b/service/rtc/sfu.go @@ -560,7 +560,9 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { } }) + us.doneWg.Add(1) go func() { + defer us.doneWg.Done() select { case offer, ok := <-us.sdpOfferInCh: if !ok { @@ -583,7 +585,11 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { return } - go us.handleICE(s.metrics) + us.doneWg.Add(1) + go func() { + defer us.doneWg.Done() + us.handleICE(s.metrics) + }() s.handleTracks(call, us) }() @@ -644,6 +650,9 @@ func (s *Server) CloseSession(sessionID string) error { us.mut.Unlock() us.rtcConn.Close() + // Wait for the signaling goroutines to be done. + us.doneWg.Wait() + if us.closeCb != nil { return us.closeCb() }