Skip to content

Commit

Permalink
Fix tracks feeding goroutine getting stuck due to missing signaling m…
Browse files Browse the repository at this point in the history
…essage (#9)
  • Loading branch information
streamer45 authored Feb 25, 2022
1 parent 2532e83 commit 9635108
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
8 changes: 5 additions & 3 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
)

const (
msgChSize = 20
msgChSize = 20
tracksChSize = 10
signalingTimeout = 10 * time.Second
)

type session struct {
Expand Down Expand Up @@ -51,11 +53,11 @@ func newUserSession(userID, channelID, connID string) *session {
signalOutCh: make(chan []byte, msgChSize),
wsMsgCh: make(chan clientMessage, msgChSize*2),
wsCloseCh: make(chan struct{}),
tracksCh: make(chan *webrtc.TrackLocalStaticRTP, 5),
tracksCh: make(chan *webrtc.TrackLocalStaticRTP, tracksChSize),
iceCh: make(chan []byte, msgChSize*2),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
trackEnableCh: make(chan bool, 5),
trackEnableCh: make(chan bool, tracksChSize),
rtpSendersMap: map[*webrtc.TrackLocalStaticRTP]*webrtc.RTPSender{},
}
}
Expand Down
49 changes: 32 additions & 17 deletions server/sfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,18 @@ func (p *Plugin) addTrack(userSession *session, track *webrtc.TrackLocalStaticRT
userSession.signalOutCh <- sdp

var answer webrtc.SessionDescription
msg, ok := <-userSession.signalInCh
if !ok {
return
}
p.LogDebug(string(msg))
if err := json.Unmarshal(msg, &answer); err != nil {
p.LogError(err.Error())
return
select {
case msg, ok := <-userSession.signalInCh:
if !ok {
return
}
p.LogDebug(string(msg))
if err := json.Unmarshal(msg, &answer); err != nil {
p.LogError(err.Error())
return
}
case <-time.After(signalingTimeout):
p.LogError("timed out waiting for signaling message", "userID", userSession.userID)
}

if err := peerConn.SetRemoteDescription(answer); err != nil {
Expand Down Expand Up @@ -366,7 +370,11 @@ func (p *Plugin) initRTCConn(userID string) {
if s.userID == userSession.userID {
return
}
s.tracksCh <- outVoiceTrack
select {
case s.tracksCh <- outVoiceTrack:
default:
p.LogError("failed to send voice track, channel is full", "userID", userID, "trackUserID", s.userID)
}
})

for {
Expand Down Expand Up @@ -433,7 +441,11 @@ func (p *Plugin) initRTCConn(userID string) {
if s.userID == userSession.userID {
return
}
s.tracksCh <- outScreenTrack
select {
case s.tracksCh <- outScreenTrack:
default:
p.LogError("failed to send screen track, channel is full", "userID", userID, "trackUserID", s.userID)
}
})

for {
Expand Down Expand Up @@ -461,13 +473,16 @@ func (p *Plugin) initRTCConn(userID string) {
}
})

msg, ok := <-userSession.signalInCh
if !ok {
return
}

if err := p.handleSignaling(userSession, msg); err != nil {
p.LogError(err.Error())
select {
case msg, ok := <-userSession.signalInCh:
if !ok {
return
}
if err := p.handleSignaling(userSession, msg); err != nil {
p.LogError(err.Error())
}
case <-time.After(signalingTimeout):
p.LogError("timed out waiting for signaling message", "userID", userID)
}

go p.handleICE(userSession)
Expand Down

0 comments on commit 9635108

Please sign in to comment.