Skip to content

Commit

Permalink
review fixes, rename receiver -> generator, sender -> responder, lint…
Browse files Browse the repository at this point in the history
…er fixes
  • Loading branch information
masterada authored and Sean-Der committed Dec 4, 2020
1 parent 499eb2c commit b93c4d4
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 180 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/pion/logging v0.2.2
github.com/pion/rtcp v1.2.4
github.com/pion/rtcp v1.2.6
github.com/pion/rtp v1.6.1
github.com/stretchr/testify v1.6.1
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.4 h1:NT3H5LkUGgaEapvp0HGik+a+CpflRF7KTD7H+o7OWIM=
github.com/pion/rtcp v1.2.4/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtcp v1.2.6 h1:1zvwBbyd0TeEuuWftrd/4d++m+/kZSeiguxU61LFWpo=
github.com/pion/rtcp v1.2.6/go.mod h1:52rMNPWFsjr39z9B9MhnkqhPLoeHTv1aN63o/42bWE0=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
14 changes: 14 additions & 0 deletions test/stream.go → internal/test/stream.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package test provides helpers for testing interceptors
package test

import (
Expand All @@ -8,6 +9,7 @@ import (
"github.com/pion/rtp"
)

// Stream is a helper struct for testing interceptors.
type Stream struct {
interceptor interceptor.Interceptor

Expand All @@ -26,16 +28,19 @@ type Stream struct {
rtpInModified chan RTPWithError
}

// RTPWithError is used to send an rtp packet or an error on a channel
type RTPWithError struct {
Packet *rtp.Packet
Err error
}

// RTCPWithError is used to send a batch of rtcp packets or an error on a channel
type RTCPWithError struct {
Packets []rtcp.Packet
Err error
}

// NewStream creates a new Stream
func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream {
s := &Stream{
interceptor: i,
Expand Down Expand Up @@ -107,40 +112,49 @@ func NewStream(info *interceptor.StreamInfo, i interceptor.Interceptor) *Stream
return s
}

// WriteRTCP writes a batch of rtcp packet to the stream, using the interceptor
func (s *Stream) WriteRTCP(pkts []rtcp.Packet) error {
_, err := s.rtcpWriter.Write(pkts, interceptor.Attributes{})
return err
}

// WriteRTP writes an rtp packet to the stream, using the interceptor
func (s *Stream) WriteRTP(p *rtp.Packet) error {
_, err := s.rtpWriter.Write(p, interceptor.Attributes{})
return err
}

// ReceiveRTCP schedules a new rtcp batch, so it can be read be the stream
func (s *Stream) ReceiveRTCP(pkts []rtcp.Packet) {
s.rtcpIn <- pkts
}

// ReceiveRTP schedules a rtp packet, so it can be read be the stream
func (s *Stream) ReceiveRTP(packet *rtp.Packet) {
s.rtpIn <- packet
}

// WrittenRTCP returns a channel containing the rtcp batches written, modified by the interceptor
func (s *Stream) WrittenRTCP() chan []rtcp.Packet {
return s.rtcpOutModified
}

// WrittenRTP returns a channel containing rtp packets written, modified by the interceptor
func (s *Stream) WrittenRTP() chan *rtp.Packet {
return s.rtpOutModified
}

// ReadRTCP returns a channel containing the rtcp batched read, modified by the interceptor
func (s *Stream) ReadRTCP() chan RTCPWithError {
return s.rtcpInModified
}

// ReadRTP returns a channel containing the rtp packets read, modified by the interceptor
func (s *Stream) ReadRTP() chan RTPWithError {
return s.rtpInModified
}

// Close closes the stream and the underlying interceptor
func (s *Stream) Close() error {
close(s.rtcpIn)
close(s.rtpIn)
Expand Down
File renamed without changes.
7 changes: 7 additions & 0 deletions pkg/nack/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Package nack provides interceptors to implement sending and receiving negative acknowledgements
package nack

import "errors"

// ErrInvalidSize is returned by newReceiveLog/newSendBuffer, when an incorrect buffer size is supplied.
var ErrInvalidSize = errors.New("invalid buffer size")
88 changes: 35 additions & 53 deletions nack/receiver_interceptor.go → pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/pion/rtp"
)

// ReceiverInterceptor interceptor generates nack messages.
type ReceiverInterceptor struct {
// GeneratorInterceptor interceptor generates nack feedback messages.
type GeneratorInterceptor struct {
interceptor.NoOp
size uint16
skipLastN uint16
Expand All @@ -24,38 +24,42 @@ type ReceiverInterceptor struct {
log logging.LeveledLogger
}

// NewReceiverInterceptor returns a new ReceiverInterceptor interceptor
func NewReceiverInterceptor(size uint16, skipLastN uint16, interval time.Duration, log logging.LeveledLogger) (*ReceiverInterceptor, error) {
_, err := NewReceiveLog(size)
if err != nil {
return nil, err
}

return &ReceiverInterceptor{
// NewGeneratorInterceptor returns a new GeneratorInterceptor interceptor
func NewGeneratorInterceptor(opts ...GeneratorOption) (*GeneratorInterceptor, error) {
r := &GeneratorInterceptor{
NoOp: interceptor.NoOp{},
size: size,
skipLastN: skipLastN,
interval: interval,
size: 8192,
skipLastN: 0,
interval: time.Millisecond * 100,
receiveLogs: &sync.Map{},
close: make(chan struct{}),
log: log,
}, nil
log: logging.NewDefaultLoggerFactory().NewLogger("nack_generator"),
}

for _, opt := range opts {
opt(r)
}

if _, err := newReceiveLog(r.size); err != nil {
return nil, err
}

return r, nil
}

// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
func (n *GeneratorInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
n.m.Lock()
defer n.m.Unlock()
select {
case <-n.close:
// already closed
n.m.Unlock()
return writer
default:
}

n.wg.Add(1)
n.m.Unlock()

go n.loop(writer)

Expand All @@ -64,7 +68,7 @@ func (n *ReceiverInterceptor) BindRTCPWriter(writer interceptor.RTCPWriter) inte

// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
func (n *GeneratorInterceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
hasNack := false
for _, fb := range info.RTCPFeedback {
if fb.Type == "nack" && fb.Parameter == "" {
Expand All @@ -76,8 +80,8 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
return reader
}

// error is already checked in NewReceiverInterceptor
receiveLog, _ := NewReceiveLog(n.size)
// error is already checked in NewGeneratorInterceptor
receiveLog, _ := newReceiveLog(n.size)
n.receiveLogs.Store(info.SSRC, receiveLog)

return interceptor.RTPReaderFunc(func() (*rtp.Packet, interceptor.Attributes, error) {
Expand All @@ -86,18 +90,19 @@ func (n *ReceiverInterceptor) BindRemoteStream(info *interceptor.StreamInfo, rea
return nil, nil, err
}

receiveLog.Add(p.SequenceNumber)
receiveLog.add(p.SequenceNumber)

return p, attr, nil
})
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (n *ReceiverInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
func (n *GeneratorInterceptor) UnbindLocalStream(info *interceptor.StreamInfo) {
n.receiveLogs.Delete(info.SSRC)
}

func (n *ReceiverInterceptor) Close() error {
// Close closes the interceptor
func (n *GeneratorInterceptor) Close() error {
defer n.wg.Wait()
n.m.Lock()
defer n.m.Unlock()
Expand All @@ -114,32 +119,31 @@ func (n *ReceiverInterceptor) Close() error {
return nil
}

func (n *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
func (n *GeneratorInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
defer n.wg.Done()

senderSSRC := rand.Uint32()
senderSSRC := rand.Uint32() // #nosec

ticker := time.NewTicker(n.interval)
for {
select {
case <-ticker.C:
n.receiveLogs.Range(func(key, value interface{}) bool {
ssrc := key.(uint32)
receiveLog := value.(*ReceiveLog)
receiveLog := value.(*receiveLog)

missing := receiveLog.MissingSeqNumbers(n.skipLastN)
missing := receiveLog.missingSeqNumbers(n.skipLastN)
if len(missing) == 0 {
return true
}

nack := &rtcp.TransportLayerNack{
SenderSSRC: senderSSRC,
MediaSSRC: ssrc,
Nacks: nackPairs(missing),
Nacks: rtcp.NackPairsFromSequenceNumbers(missing),
}

_, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{})
if err != nil {
if _, err := rtcpWriter.Write([]rtcp.Packet{nack}, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending nack: %+v", err)
}

Expand All @@ -151,25 +155,3 @@ func (n *ReceiverInterceptor) loop(rtcpWriter interceptor.RTCPWriter) {
}
}
}

func nackPairs(seqNums []uint16) []rtcp.NackPair {
// TODO: I think this shoud be moved to rtcp package
pairs := make([]rtcp.NackPair, 0)
startSeq := seqNums[0]
nackPair := &rtcp.NackPair{PacketID: startSeq}
for i := 1; i < len(seqNums); i++ {
m := seqNums[i]

if m-nackPair.PacketID > 16 {
pairs = append(pairs, *nackPair)
nackPair = &rtcp.NackPair{PacketID: m}
continue
}

nackPair.LostPackets |= 1 << (m - nackPair.PacketID - 1)
}

pairs = append(pairs, *nackPair)

return pairs
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ import (
"time"

"github.com/pion/interceptor"
"github.com/pion/interceptor/test"
"github.com/pion/interceptor/internal/test"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)

func TestReceiverInterceptor(t *testing.T) {
func TestGeneratorInterceptor(t *testing.T) {
const interval = time.Millisecond * 10
i, err := NewReceiverInterceptor(64, 2, interval, logging.NewDefaultLoggerFactory().NewLogger("test"))
i, err := NewGeneratorInterceptor(
GeneratorSize(64),
GeneratorSkipLastN(2),
GeneratorInterval(interval),
GeneratorLog(logging.NewDefaultLoggerFactory().NewLogger("test")),
)
if err != nil {
t.Fatal(err)
}
Expand All @@ -24,20 +29,15 @@ func TestReceiverInterceptor(t *testing.T) {
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
err := stream.Close()
if err != nil {
t.Errorf("error closing stream: %v", err)
}
assert.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 16, 18} {
stream.ReceiveRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}})

select {
case r := <-stream.ReadRTP():
if r.Err != nil {
t.Fatal(r.Err)
}
assert.NoError(t, r.Err)
assert.Equal(t, seqNum, r.Packet.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("receiver rtp packet not found")
Expand All @@ -54,17 +54,19 @@ func TestReceiverInterceptor(t *testing.T) {

select {
case pkts := <-stream.WrittenRTCP():
if len(pkts) != 1 {
t.Fatalf("single packet rtcp batch expected, found: %v", pkts)
}
assert.Equal(t, len(pkts), 1, "single packet RTCP Compound Packet expected")

p, ok := pkts[0].(*rtcp.TransportLayerNack)
if !ok {
t.Fatalf("TransportLayerNack rtcp packet expected, found: %T", pkts[0])
}
assert.True(t, ok, "TransportLayerNack rtcp packet expected, found: %T", pkts[0])

assert.Equal(t, uint16(13), p.Nacks[0].PacketID)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is set to 2)
assert.Equal(t, rtcp.PacketBitmap(0b10), p.Nacks[0].LostPackets) // we want packets: 13, 15 (not packet 17, because skipLastN is setReceived to 2)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtcp packet not found")
}
}

func TestGeneratorInterceptor_InvalidSize(t *testing.T) {
_, err := NewGeneratorInterceptor(GeneratorSize(5))
assert.Error(t, err, ErrInvalidSize)
}
Loading

0 comments on commit b93c4d4

Please sign in to comment.