Skip to content

Commit

Permalink
Update pion/interceptor for NACKs
Browse files Browse the repository at this point in the history
Generate + Respond interceptors
  • Loading branch information
tarrencev authored and Sean-Der committed Dec 15, 2020
1 parent 85ced4a commit a54b74c
Show file tree
Hide file tree
Showing 15 changed files with 162 additions and 29 deletions.
14 changes: 13 additions & 1 deletion examples/broadcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,23 @@ func main() { // nolint:gocognit
panic(err)
}

_, err = peerConnection.AddTrack(localTrack)
rtpSender, err := peerConnection.AddTrack(localTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

// Set the remote SessionDescription
err = peerConnection.SetRemoteDescription(recvOnlyOffer)
if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion examples/insertable-streams/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,23 @@ func main() {
if err != nil {
panic(err)
}
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
rtpSender, err := peerConnection.AddTrack(videoTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

iceConnectedCtx, iceConnectedCtxCancel := context.WithCancel(context.Background())
go func() {
// Open a IVF file and start reading using our IVFReader
Expand Down
15 changes: 14 additions & 1 deletion examples/play-from-disk-renegotation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,23 @@ func addVideo(w http.ResponseWriter, r *http.Request) {
if err != nil {
panic(err)
}
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
rtpSender, err := peerConnection.AddTrack(videoTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

go writeVideoToTrack(videoTrack)
doSignaling(w, r)
fmt.Println("Video track has been added")
Expand Down
31 changes: 29 additions & 2 deletions examples/play-from-disk/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,23 @@ func main() {
panic(videoTrackErr)
}

if _, videoTrackErr = peerConnection.AddTrack(videoTrack); videoTrackErr != nil {
rtpSender, videoTrackErr := peerConnection.AddTrack(videoTrack)
if videoTrackErr != nil {
panic(videoTrackErr)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

go func() {
// Open a IVF file and start reading using our IVFReader
file, ivfErr := os.Open(videoFileName)
Expand Down Expand Up @@ -100,10 +113,24 @@ func main() {
if audioTrackErr != nil {
panic(audioTrackErr)
}
if _, audioTrackErr = peerConnection.AddTrack(audioTrack); audioTrackErr != nil {

rtpSender, audioTrackErr := peerConnection.AddTrack(audioTrack)
if audioTrackErr != nil {
panic(audioTrackErr)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

go func() {
// Open a IVF file and start reading using our IVFReader
file, oggErr := os.Open(audioFileName)
Expand Down
15 changes: 14 additions & 1 deletion examples/reflect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ func main() {
}

// Add this newly created track to the PeerConnection
if _, err = peerConnection.AddTrack(outputTrack); err != nil {
rtpSender, err := peerConnection.AddTrack(outputTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

// Wait for the offer to be pasted
offer := webrtc.SessionDescription{}
signal.Decode(signal.MustReadStdin(), &offer)
Expand Down
15 changes: 14 additions & 1 deletion examples/rtp-to-webrtc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,23 @@ func main() {
if err != nil {
panic(err)
}
if _, err = peerConnection.AddTrack(videoTrack); err != nil {
rtpSender, err := peerConnection.AddTrack(videoTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
Expand Down
15 changes: 15 additions & 0 deletions examples/simulcast/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ func main() {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
processRTCP := func(rtpSender *webrtc.RTPSender) {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}
for _, rtpSender := range peerConnection.GetSenders() {
go processRTCP(rtpSender)
}

// Wait for the offer to be pasted
offer := webrtc.SessionDescription{}
signal.Decode(signal.MustReadStdin(), &offer)
Expand Down
26 changes: 13 additions & 13 deletions examples/swap-tracks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ func main() { // nolint:gocognit
}

// Add this newly created track to the PeerConnection
if _, err = peerConnection.AddTrack(outputTrack); err != nil {
panic(err)
}

// In addition to the implicit transceiver added by the track, we add two more
// for the other tracks
_, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
if err != nil {
panic(err)
}
_, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo,
webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly})
rtpSender, err := peerConnection.AddTrack(outputTrack)
if err != nil {
panic(err)
}

// Read incoming RTCP packets
// Before these packets are retuned they are processed by interceptors. For things
// like NACK this needs to be called.
go func() {
rtcpBuf := make([]byte, 1500)
for {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}()

// Wait for the offer to be pasted
offer := webrtc.SessionDescription{}
signal.Decode(signal.MustReadStdin(), &offer)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/pion/datachannel v1.4.21
github.com/pion/dtls/v2 v2.0.4
github.com/pion/ice/v2 v2.0.14
github.com/pion/interceptor v0.0.6
github.com/pion/interceptor v0.0.8
github.com/pion/logging v0.2.2
github.com/pion/randutil v0.1.0
github.com/pion/rtcp v1.2.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ github.com/pion/dtls/v2 v2.0.4 h1:WuUcqi6oYMu/noNTz92QrF1DaFj4eXbhQ6dzaaAwOiI=
github.com/pion/dtls/v2 v2.0.4/go.mod h1:qAkFscX0ZHoI1E07RfYPoRw3manThveu+mlTDdOxoGI=
github.com/pion/ice/v2 v2.0.14 h1:FxXxauyykf89SWAtkQCfnHkno6G8+bhRkNguSh9zU+4=
github.com/pion/ice/v2 v2.0.14/go.mod h1:wqaUbOq5ObDNU5ox1hRsEst0rWfsKuH1zXjQFEWiZwM=
github.com/pion/interceptor v0.0.6 h1:530EdZi757pZEx510kvO25FkEuKm2mrb0p9NA+Xfj8E=
github.com/pion/interceptor v0.0.6/go.mod h1:QHkPVN5uyuw54wHqqL1KS9fxf3M3RzOlVKg/YrtK1so=
github.com/pion/interceptor v0.0.8 h1:qsVJv9RF7mPq/RUnUV5iZCzxwGizO880FuiFKkEGQaE=
github.com/pion/interceptor v0.0.8/go.mod h1:dHgEP5dtxOTf21MObuBAjJeAayPxLUAZjerGH8Xr07c=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY=
Expand Down
14 changes: 13 additions & 1 deletion interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/rtp"
)

Expand All @@ -21,9 +22,20 @@ func RegisterDefaultInterceptors(mediaEngine *MediaEngine, interceptorRegistry *

// ConfigureNack will setup everything necessary for handling generating/responding to nack messages.
func ConfigureNack(mediaEngine *MediaEngine, interceptorRegistry *interceptor.Registry) error {
generator, err := nack.NewGeneratorInterceptor()
if err != nil {
return err
}

responder, err := nack.NewResponderInterceptor()
if err != nil {
return err
}

mediaEngine.RegisterFeedback(RTCPFeedback{Type: "nack"}, RTPCodecTypeVideo)
mediaEngine.RegisterFeedback(RTCPFeedback{Type: "nack", Parameter: "pli"}, RTPCodecTypeVideo)
interceptorRegistry.Add(&interceptor.NACK{})
interceptorRegistry.Add(responder)
interceptorRegistry.Add(generator)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion mediaengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (m *MediaEngine) getRTPParametersByPayloadType(payloadType PayloadType) (RT

func payloaderForCodec(codec RTPCodecCapability) (rtp.Payloader, error) {
switch strings.ToLower(codec.MimeType) {
case strings.ToLower(MimeTypeH264):
case strings.ToLower(MimeTypeH264):
return &codecs.H264Payloader{}, nil
case strings.ToLower(MimeTypeOpus):
return &codecs.OpusPayloader{}, nil
Expand Down
10 changes: 8 additions & 2 deletions peerconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,17 @@ type PeerConnection struct {
// NewPeerConnection creates a peerconnection with the default
// codecs. See API.NewPeerConnection for details.
func NewPeerConnection(configuration Configuration) (*PeerConnection, error) {
m := MediaEngine{}
m := &MediaEngine{}
if err := m.RegisterDefaultCodecs(); err != nil {
return nil, err
}
api := NewAPI(WithMediaEngine(&m))

i := &interceptor.Registry{}
if err := RegisterDefaultInterceptors(m, i); err != nil {
return nil, err
}

api := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(i))
return api.NewPeerConnection(configuration)
}

Expand Down
2 changes: 2 additions & 0 deletions peerconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func TestNew(t *testing.T) {
})
assert.NoError(t, err)
assert.NotNil(t, pc)
assert.NoError(t, pc.Close())
}

func TestPeerConnection_SetConfiguration(t *testing.T) {
Expand Down Expand Up @@ -229,6 +230,7 @@ func TestPeerConnection_GetConfiguration(t *testing.T) {
// See: https://github.com/pion/webrtc/issues/513.
// assert.Equal(t, len(expected.Certificates), len(actual.Certificates))
assert.Equal(t, expected.ICECandidatePoolSize, actual.ICECandidatePoolSize)
assert.NoError(t, pc.Close())
}

const minimalOffer = `v=0
Expand Down
11 changes: 9 additions & 2 deletions rtpreceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func (r *RTPReceiver) Receive(parameters RTPReceiveParameters) error {
),
}

globalParams := r.GetParameters()
codec := RTPCodecCapability{}
if len(globalParams.Codecs) != 0 {
codec = globalParams.Codecs[0].RTPCodecCapability
}

streamInfo := createStreamInfo("", parameters.Encodings[0].SSRC, 0, codec, globalParams.HeaderExtensions)
var err error
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, interceptor.StreamInfo{}); err != nil {
if t.rtpReadStream, t.rtpInterceptor, t.rtcpReadStream, t.rtcpInterceptor, err = r.streamsForSSRC(parameters.Encodings[0].SSRC, streamInfo); err != nil {
return err
}

Expand Down Expand Up @@ -271,7 +278,7 @@ func (r *RTPReceiver) receiveForRid(rid string, params RTPParameters, ssrc SSRC)
r.tracks[i].track.mu.Unlock()

var err error
if r.tracks[0].rtpReadStream, r.tracks[0].rtpInterceptor, r.tracks[0].rtcpReadStream, r.tracks[0].rtcpInterceptor, err = r.streamsForSSRC(ssrc, streamInfo); err != nil {
if r.tracks[i].rtpReadStream, r.tracks[i].rtpInterceptor, r.tracks[i].rtcpReadStream, r.tracks[i].rtcpInterceptor, err = r.streamsForSSRC(ssrc, streamInfo); err != nil {
return nil, err
}

Expand Down

0 comments on commit a54b74c

Please sign in to comment.