diff --git a/go.mod b/go.mod index 7f22d05..d0eb0e1 100644 --- a/go.mod +++ b/go.mod @@ -10,19 +10,19 @@ require ( github.com/kelseyhightower/envconfig v1.4.0 github.com/mattermost/mattermost/server/public v0.0.12 github.com/pborman/uuid v1.2.1 - github.com/pion/ice/v2 v2.3.24 + github.com/pion/ice/v2 v2.3.25 github.com/pion/interceptor v0.1.29 github.com/pion/logging v0.2.2 github.com/pion/rtcp v1.2.14 github.com/pion/rtp v1.8.6 github.com/pion/stun v0.6.1 - github.com/pion/webrtc/v3 v3.2.40 + github.com/pion/webrtc/v3 v3.2.41 github.com/prometheus/client_golang v1.15.0 github.com/prometheus/procfs v0.9.0 github.com/stretchr/testify v1.9.0 github.com/vmihailenco/msgpack/v5 v5.4.1 - golang.org/x/crypto v0.23.0 - golang.org/x/sys v0.20.0 + golang.org/x/crypto v0.24.0 + golang.org/x/sys v0.21.0 golang.org/x/time v0.0.0-20191024005414-555d28b269f0 ) @@ -68,8 +68,8 @@ require ( github.com/wiggin77/merror v1.0.5 // indirect github.com/wiggin77/srslog v1.0.1 // indirect golang.org/x/exp v0.0.0-20200908183739-ae8ad444f925 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/text v0.16.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index ffb12d3..20d2b40 100644 --- a/go.sum +++ b/go.sum @@ -320,8 +320,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8= @@ -356,8 +356,8 @@ github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37 github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc= github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY= -github.com/pion/webrtc/v3 v3.2.40 h1:Wtfi6AZMQg+624cvCXUuSmrKWepSB7zfgYDOYqsSOVU= -github.com/pion/webrtc/v3 v3.2.40/go.mod h1:M1RAe3TNTD1tzyvqHrbVODfwdPGSXOUo/OgpoGGJqFY= +github.com/pion/webrtc/v3 v3.2.41 h1:bz6GxA2bk247YI+uwd9m9Jw3bwSL7g7k0xkBZnl/mF4= +github.com/pion/webrtc/v3 v3.2.41/go.mod h1:M1RAe3TNTD1tzyvqHrbVODfwdPGSXOUo/OgpoGGJqFY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -531,8 +531,8 @@ golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98y golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -628,8 +628,8 @@ golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -720,8 +720,8 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -745,8 +745,8 @@ golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/service/client_msg.go b/service/client_msg.go index 8a12849..9745403 100644 --- a/service/client_msg.go +++ b/service/client_msg.go @@ -12,8 +12,8 @@ import ( ) type ClientMessage struct { - Type string `msgpack:"type"` - Data interface{} `msgpack:"data,omitempty"` + Type string `msgpack:"type"` + Data any `msgpack:"data,omitempty"` } const ( @@ -42,7 +42,13 @@ func (cm *ClientMessage) DecodeMsgpack(dec *msgpack.Decoder) error { cm.Type = msgType switch cm.Type { - case ClientMessageJoin, ClientMessageLeave, ClientMessageHello, ClientMessageReconnect, ClientMessageClose: + case ClientMessageJoin: + data, err := dec.DecodeMap() + if err != nil { + return fmt.Errorf("failed to decode msg.Data: %w", err) + } + cm.Data = data + case ClientMessageLeave, ClientMessageHello, ClientMessageReconnect, ClientMessageClose: data, err := dec.DecodeTypedMap() if err != nil { return fmt.Errorf("failed to decode msg.Data: %w", err) diff --git a/service/client_msg_test.go b/service/client_msg_test.go index 928c0e7..4eb7ed6 100644 --- a/service/client_msg_test.go +++ b/service/client_msg_test.go @@ -23,7 +23,7 @@ func TestClientMessage(t *testing.T) { }) t.Run("with join type", func(t *testing.T) { - msgData := map[string]string{ + msgData := map[string]any{ "connID": "conn_id", } msg := NewClientMessage(ClientMessageJoin, msgData) diff --git a/service/rtc/config.go b/service/rtc/config.go index 972ba6b..fbd2282 100644 --- a/service/rtc/config.go +++ b/service/rtc/config.go @@ -74,14 +74,19 @@ type SessionConfig struct { UserID string // SessionID specifies the unique identifier for the session. SessionID string - Props map[string]any + // Props specifies some properties for the session. + Props SessionProps } -func (c *SessionConfig) GetStringProp(key string) string { - if c == nil || c.Props == nil { - return "" - } - val, _ := c.Props[key].(string) +type SessionProps map[string]any + +func (p SessionProps) ChannelID() string { + val, _ := p["channelID"].(string) + return val +} + +func (p SessionProps) AV1Support() bool { + val, _ := p["av1Support"].(bool) return val } @@ -105,6 +110,26 @@ func (c SessionConfig) IsValid() error { return nil } +func (c *SessionConfig) FromMap(m map[string]any) error { + if c == nil { + return fmt.Errorf("invalid nil config") + } + if m == nil { + return fmt.Errorf("invalid nil map") + } + + c.GroupID, _ = m["groupID"].(string) + c.CallID, _ = m["callID"].(string) + c.UserID, _ = m["userID"].(string) + c.SessionID, _ = m["sessionID"].(string) + c.Props = SessionProps{ + "channelID": m["channelID"], + "av1Support": m["av1Support"], + } + + return nil +} + type ICEServerConfig struct { URLs []string `toml:"urls" json:"urls"` Username string `toml:"username,omitempty" json:"username,omitempty"` diff --git a/service/rtc/config_test.go b/service/rtc/config_test.go index 2ed8d5b..2e917ce 100644 --- a/service/rtc/config_test.go +++ b/service/rtc/config_test.go @@ -315,3 +315,83 @@ func TestICEHostPortOverrideParseMap(t *testing.T) { }, m) }) } + +func TestSessionConfigFromMap(t *testing.T) { + t.Run("nil config", func(t *testing.T) { + var cfg *SessionConfig + err := cfg.FromMap(map[string]any{}) + require.EqualError(t, err, "invalid nil config") + }) + + t.Run("nil map", func(t *testing.T) { + var cfg SessionConfig + err := cfg.FromMap(nil) + require.EqualError(t, err, "invalid nil map") + }) + + t.Run("missing props", func(t *testing.T) { + var cfg SessionConfig + err := cfg.FromMap(map[string]any{ + "callID": "callID", + "sessionID": "sessionID", + "groupID": "groupID", + "userID": "userID", + }) + require.NoError(t, err) + require.Equal(t, SessionConfig{ + GroupID: "groupID", + SessionID: "sessionID", + UserID: "userID", + CallID: "callID", + Props: SessionProps{ + "channelID": nil, + "av1Support": nil, + }, + }, cfg) + }) + + t.Run("complete", func(t *testing.T) { + var cfg SessionConfig + err := cfg.FromMap(map[string]any{ + "callID": "callID", + "sessionID": "sessionID", + "groupID": "groupID", + "userID": "userID", + "channelID": "channelID", + "av1Support": true, + }) + require.NoError(t, err) + require.NoError(t, cfg.IsValid()) + require.Equal(t, SessionConfig{ + GroupID: "groupID", + SessionID: "sessionID", + UserID: "userID", + CallID: "callID", + Props: SessionProps{ + "channelID": "channelID", + "av1Support": true, + }, + }, cfg) + }) +} + +func TestSessionProps(t *testing.T) { + t.Run("empty props", func(t *testing.T) { + cfg := SessionConfig{ + Props: SessionProps{}, + } + require.Empty(t, cfg.Props.ChannelID()) + require.False(t, cfg.Props.AV1Support()) + }) + + t.Run("complete props", func(t *testing.T) { + cfg := SessionConfig{ + Props: SessionProps{ + "channelID": "channelID", + "av1Support": true, + }, + } + require.Equal(t, "channelID", cfg.Props.ChannelID()) + require.True(t, cfg.Props.AV1Support()) + }) +} diff --git a/service/rtc/session.go b/service/rtc/session.go index 70d9ade..22c458d 100644 --- a/service/rtc/session.go +++ b/service/rtc/session.go @@ -170,7 +170,7 @@ func (s *session) getScreenStreamID() string { return s.screenStreamID } -func (s *session) getRemoteScreenTrack(rid string) *webrtc.TrackRemote { +func (s *session) getRemoteScreenTrack(mimeType, rid string) *webrtc.TrackRemote { s.mut.RLock() defer s.mut.RUnlock() @@ -178,10 +178,10 @@ func (s *session) getRemoteScreenTrack(rid string) *webrtc.TrackRemote { rid = SimulcastLevelDefault } - return s.remoteScreenTracks[rid] + return s.remoteScreenTracks[getTrackIndex(mimeType, rid)] } -func (s *session) getSourceRate(rid string) int { +func (s *session) getSourceRate(mimeType, rid string) int { s.mut.RLock() defer s.mut.RUnlock() @@ -189,7 +189,7 @@ func (s *session) getSourceRate(rid string) int { rid = SimulcastLevelDefault } - rm := s.screenRateMonitors[rid] + rm := s.screenRateMonitors[getTrackIndex(mimeType, rid)] if rm == nil { s.log.Warn("rate monitor should not be nil", mlog.String("sessionID", s.cfg.SessionID)) @@ -201,11 +201,11 @@ func (s *session) getSourceRate(rid string) int { return rate } -func (s *session) getOutScreenTrack(rid string) *webrtc.TrackLocalStaticRTP { +func (s *session) getOutScreenTrack(mimeType, rid string) *webrtc.TrackLocalStaticRTP { s.mut.RLock() defer s.mut.RUnlock() - return pickRandom(s.outScreenTracks[rid]) + return pickRandom(s.outScreenTracks[getTrackIndex(mimeType, rid)]) } func (s *session) getExpectedSimulcastLevel() string { @@ -291,7 +291,18 @@ func (s *session) handleSenderRTCP(sender *webrtc.RTPSender) { return } - screenTrack := screenSession.getRemoteScreenTrack(sender.Track().RID()) + senderTrack, ok := sender.Track().(*webrtc.TrackLocalStaticRTP) + if !ok { + s.log.Error("track conversion failed", mlog.String("sessionID", s.cfg.SessionID)) + return + } + + if senderTrack == nil { + s.log.Error("senderTrack should not be nil", mlog.String("sessionID", s.cfg.SessionID)) + return + } + + screenTrack := screenSession.getRemoteScreenTrack(senderTrack.Codec().MimeType, sender.Track().RID()) if screenTrack == nil { s.log.Error("screenTrack should not be nil", mlog.String("sessionID", s.cfg.SessionID)) return @@ -548,3 +559,11 @@ func (s *session) clearScreenState() { s.remoteScreenTracks = make(map[string]*webrtc.TrackRemote) s.screenRateMonitors = make(map[string]*RateMonitor) } + +func (s *session) supportsAV1() bool { + if s.cfg.Props == nil { + return false + } + + return s.cfg.Props.AV1Support() +} diff --git a/service/rtc/sfu.go b/service/rtc/sfu.go index 47633be..cfffa22 100644 --- a/service/rtc/sfu.go +++ b/service/rtc/sfu.go @@ -41,15 +41,24 @@ var ( RTCPFeedback: nil, } rtpVideoCodecs = map[string]webrtc.RTPCodecParameters{ - "video/VP8": { + webrtc.MimeTypeVP8: { RTPCodecCapability: webrtc.RTPCodecCapability{ - MimeType: "video/VP8", + MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, SDPFmtpLine: "", RTCPFeedback: videoRTCPFeedback, }, PayloadType: 96, }, + webrtc.MimeTypeAV1: { + RTPCodecCapability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeAV1, + ClockRate: 90000, + SDPFmtpLine: "", + RTCPFeedback: videoRTCPFeedback, + }, + PayloadType: 45, + }, } rtpVideoExtensions = []string{ "urn:ietf:params:rtp-hdrext:sdes:mid", @@ -59,9 +68,10 @@ var ( ) const ( - nackResponderBufferSize = 256 - audioLevelExtensionURI = "urn:ietf:params:rtp-hdrext:ssrc-audio-level" - writerQueueSize = 200 // Enough to hold up to one second of video packets. + nackResponderBufferSize = 256 + audioLevelExtensionURI = "urn:ietf:params:rtp-hdrext:ssrc-audio-level" + writerQueueSize = 200 // Enough to hold up to one second of video packets. + ScreenTrackMimeTypeDefault = webrtc.MimeTypeVP8 ) func (s *Server) initSettingEngine() (webrtc.SettingEngine, error) { @@ -180,6 +190,10 @@ func initInterceptors(m *webrtc.MediaEngine) (*interceptor.Registry, <-chan cc.B } func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { + if err := cfg.IsValid(); err != nil { + return fmt.Errorf("invalid session config: %w", err) + } + s.metrics.IncRTCSessions(cfg.GroupID) iceServers := make([]webrtc.ICEServer, 0, len(s.cfg.ICEServers)) @@ -519,10 +533,11 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { return } + trackIdx := getTrackIndex(trackMimeType, rid) us.mut.Lock() - us.outScreenTracks[rid] = outScreenTracks - us.remoteScreenTracks[rid] = remoteTrack - us.screenRateMonitors[rid] = rm + us.outScreenTracks[trackIdx] = outScreenTracks + us.remoteScreenTracks[trackIdx] = remoteTrack + us.screenRateMonitors[trackIdx] = rm us.mut.Unlock() call.iterSessions(func(ss *session) { @@ -539,9 +554,22 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { return } + if trackMimeType == ScreenTrackMimeTypeDefault && us.supportsAV1() && ss.supportsAV1() { + s.log.Debug("skipping VP8 track for AV1 supported receiver", + mlog.String("sessionID", ss.cfg.SessionID), + ) + return + } else if trackMimeType == webrtc.MimeTypeAV1 && !ss.supportsAV1() { + s.log.Debug("skipping AV1 track for unsupported receiver", + mlog.String("sessionID", ss.cfg.SessionID), + ) + return + } + s.log.Debug("received track matches expected level, sending", mlog.String("lvl", expectedLevel), mlog.String("sessionID", ss.cfg.SessionID), + mlog.String("trackMimeType", trackMimeType), ) select { @@ -622,7 +650,7 @@ func (s *Server) InitSession(cfg SessionConfig, closeCb func() error) error { s.log.Debug("session has joined call", mlog.String("userID", cfg.UserID), mlog.String("sessionID", cfg.SessionID), - mlog.String("channelID", cfg.GetStringProp("channelID")), + mlog.String("channelID", cfg.Props.ChannelID()), mlog.String("callID", cfg.CallID), ) @@ -701,7 +729,16 @@ func (s *Server) handleTracks(call *call, us *session) { ss.mut.RLock() outVoiceTrack := ss.outVoiceTrack - outScreenTracks := ss.outScreenTracks[SimulcastLevelDefault] + + // Screen track selection. Both sender and receiver support it + // in order to send out the AV1 track. + screenTrackMimeType := ScreenTrackMimeTypeDefault + if ss.supportsAV1() && us.supportsAV1() { + s.log.Debug("both sender and receiver support AV1", mlog.String("sessionID", us.cfg.SessionID)) + screenTrackMimeType = webrtc.MimeTypeAV1 + } + outScreenTracks := ss.outScreenTracks[getTrackIndex(screenTrackMimeType, SimulcastLevelDefault)] + outScreenAudioTrack := ss.outScreenAudioTrack ss.mut.RUnlock() diff --git a/service/rtc/simulcast.go b/service/rtc/simulcast.go index a28a396..952cb8a 100644 --- a/service/rtc/simulcast.go +++ b/service/rtc/simulcast.go @@ -11,6 +11,7 @@ import ( "golang.org/x/time/rate" "github.com/pion/interceptor/pkg/cc" + "github.com/pion/webrtc/v3" "github.com/mattermost/mattermost/server/public/shared/mlog" ) @@ -178,7 +179,14 @@ func (s *session) handleSenderBitrateChange(downRate int, lossRate int) (bool, i return false, 0, "" } - currSourceRate := screenSession.getSourceRate(currLevel) + localTrack, ok := currTrack.(*webrtc.TrackLocalStaticRTP) + if !ok { + s.log.Error("track conversion failed", mlog.String("sessionID", s.cfg.SessionID)) + return false, 0, "" + } + mimeType := localTrack.Codec().MimeType + + currSourceRate := screenSession.getSourceRate(mimeType, currLevel) if currSourceRate <= 0 { s.log.Warn("current source rate not available yet", mlog.String("sessionID", s.cfg.SessionID)) return false, 0, "" @@ -197,13 +205,13 @@ func (s *session) handleSenderBitrateChange(downRate int, lossRate int) (bool, i return false, 0, "" } - newTrack := screenSession.getOutScreenTrack(newLevel) + newTrack := screenSession.getOutScreenTrack(mimeType, newLevel) if newTrack == nil { // if the desired track is not available we keep the current one return false, 0, "" } - sourceRate := screenSession.getSourceRate(newLevel) + sourceRate := screenSession.getSourceRate(mimeType, newLevel) if sourceRate <= 0 { s.log.Warn("source rate not available", mlog.String("sessionID", s.cfg.SessionID)) return false, 0, "" diff --git a/service/rtc/utils.go b/service/rtc/utils.go index 95805c1..7223761 100644 --- a/service/rtc/utils.go +++ b/service/rtc/utils.go @@ -33,6 +33,10 @@ func genTrackID(tt trackType, baseID string) string { return string(tt) + "_" + baseID + "_" + random.NewID()[0:8] } +func getTrackIndex(mimeType, rid string) string { + return mimeType + "_" + rid +} + func isValidTrackID(trackID string) bool { fields := strings.Split(trackID, "_") if len(fields) != 3 { diff --git a/service/service.go b/service/service.go index 7bc617f..1bf25df 100644 --- a/service/service.go +++ b/service/service.go @@ -286,31 +286,24 @@ func (s *Service) handleClientMsg(msg ws.Message) error { var rtcMsg rtc.Message switch cm.Type { case ClientMessageJoin: - data, ok := cm.Data.(map[string]string) + data, ok := cm.Data.(map[string]any) if !ok { return fmt.Errorf("unexpected data type: %T", cm.Data) } - callID := data["callID"] - if callID == "" { - return fmt.Errorf("missing callID in client message") - } - userID := data["userID"] - if userID == "" { - return fmt.Errorf("missing userID in client message") - } - sessionID := data["sessionID"] - if sessionID == "" { - return fmt.Errorf("missing sessionID in client message") + + var cfg rtc.SessionConfig + if err := cfg.FromMap(data); err != nil { + return fmt.Errorf("failed to read session config from map: %w", err) } - channelID := data["channelID"] + cfg.GroupID = msg.ClientID closeCb := func() error { s.mut.Lock() defer s.mut.Unlock() - delete(s.connMap, sessionID) + delete(s.connMap, cfg.SessionID) data, err := NewPackedClientMessage(ClientMessageClose, map[string]string{ - "sessionID": sessionID, + "sessionID": cfg.SessionID, }) if err != nil { return fmt.Errorf("failed to pack close message: %w", err) @@ -323,16 +316,6 @@ func (s *Service) handleClientMsg(msg ws.Message) error { return nil } - cfg := rtc.SessionConfig{ - GroupID: msg.ClientID, - CallID: callID, - UserID: userID, - SessionID: sessionID, - Props: map[string]any{ - "channelID": channelID, - }, - } - s.log.Debug("join message", mlog.Any("sessionCfg", cfg)) if err := s.rtcServer.InitSession(cfg, closeCb); err != nil { @@ -340,7 +323,7 @@ func (s *Service) handleClientMsg(msg ws.Message) error { } s.mut.Lock() - s.connMap[sessionID] = msg.ConnID + s.connMap[cfg.SessionID] = msg.ConnID s.mut.Unlock() return nil