Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make read notify use sync.Cond instead of chan #304

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 52 additions & 24 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ type Buffer struct {
data []byte
head, tail int

notify chan struct{}
waiting bool
closed bool
closed bool

count int
limitCount, limitSize int

readDeadline *deadline.Deadline
readDeadline *deadline.Deadline
nextDeadline chan struct{}
readNotifier *sync.Cond
readChannelWatcherRunning sync.WaitGroup
}

const (
Expand All @@ -56,9 +57,35 @@ const (

// NewBuffer creates a new Buffer.
func NewBuffer() *Buffer {
return &Buffer{
notify: make(chan struct{}, 1),
buffer := &Buffer{
readDeadline: deadline.New(),
nextDeadline: make(chan struct{}, 1),
}
buffer.readNotifier = sync.NewCond(&buffer.mutex)
buffer.readChannelWatcherRunning.Add(1)
go buffer.readDeadlineWatcher()
return buffer
}

func (b *Buffer) readDeadlineWatcher() {
defer b.readChannelWatcherRunning.Done()
for {
select {
case <-b.readDeadline.Done():
b.mutex.Lock()
b.readNotifier.Broadcast()
b.mutex.Unlock()
case _, ok := <-b.nextDeadline:
if ok {
continue
}
return
}

_, ok := <-b.nextDeadline
if !ok {
return
}
}
}

Expand Down Expand Up @@ -173,15 +200,7 @@ func (b *Buffer) Write(packet []byte) (int, error) {
}
b.count++

waiting := b.waiting
b.waiting = false

if waiting {
select {
case b.notify <- struct{}{}:
default:
}
}
b.readNotifier.Signal()
b.mutex.Unlock()

return len(packet), nil
Expand All @@ -199,9 +218,8 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
default:
}

b.mutex.Lock()
for {
b.mutex.Lock()

if b.head != b.tail {
// decode the packet size
n1 := b.data[b.head]
Expand Down Expand Up @@ -244,7 +262,6 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

b.count--
b.waiting = false
b.mutex.Unlock()

if copied < count {
Expand All @@ -258,19 +275,19 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
return 0, io.EOF
}

b.waiting = true
b.mutex.Unlock()

b.readNotifier.Wait()
select {
case <-b.readDeadline.Done():
b.mutex.Unlock()
return 0, &netError{ErrTimeout, true, true}
case <-b.notify:
default:
}
}
}

// Close the buffer, unblocking any pending reads.
// Data in the buffer can still be read, Read will return io.EOF only when empty.
// It returns when any goroutines Buffer started have completed.
func (b *Buffer) Close() (err error) {
b.mutex.Lock()

Expand All @@ -279,11 +296,12 @@ func (b *Buffer) Close() (err error) {
return nil
}

b.waiting = false
b.closed = true
close(b.notify)
close(b.nextDeadline)
b.readNotifier.Broadcast()
b.mutex.Unlock()

b.readChannelWatcherRunning.Wait()
return nil
}

Expand Down Expand Up @@ -338,6 +356,16 @@ func (b *Buffer) SetLimitSize(limit int) {
// SetReadDeadline sets the deadline for the Read operation.
// Setting to zero means no deadline.
func (b *Buffer) SetReadDeadline(t time.Time) error {
b.mutex.Lock()
defer b.mutex.Unlock()

b.readDeadline.Set(t)
select {
case b.nextDeadline <- struct{}{}:
default:
// if there is no receiver, then we know that readDeadlineWatcher
// is about to receive the buffered value in the channel. otherwise
// we communicated the next deadline to the receiver directly.
}
return nil
}
75 changes: 68 additions & 7 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/pion/transport/v3/test"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -430,10 +431,10 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes", test(w, 100, 11))
t.Run("200 writes", test(w, 200, 14))
t.Run("400 writes", test(w, 400, 17))
t.Run("1000 writes", test(w, 1000, 21))
t.Run("100 writes", test(w, 100, 14))
t.Run("200 writes", test(w, 200, 20))
t.Run("400 writes", test(w, 400, 26))
t.Run("1000 writes", test(w, 1000, 32))

wr := func(count int) func() {
return func() {
Expand All @@ -451,9 +452,9 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes and reads", test(wr, 100, 5))
t.Run("1000 writes and reads", test(wr, 1000, 5))
t.Run("10000 writes and reads", test(wr, 10000, 5))
t.Run("100 writes and reads", test(wr, 100, 18))
t.Run("1000 writes and reads", test(wr, 1000, 18))
t.Run("10000 writes and reads", test(wr, 10000, 18))
}

func benchmarkBufferWR(b *testing.B, size int64, write bool, grow int) { // nolint:unparam
Expand Down Expand Up @@ -584,6 +585,8 @@ func BenchmarkBuffer1400(b *testing.B) {
}

func TestBufferConcurrentRead(t *testing.T) {
defer test.TimeOut(time.Second * 5).Stop()

assert := assert.New(t)

buffer := NewBuffer()
Expand Down Expand Up @@ -626,3 +629,61 @@ func TestBufferConcurrentRead(t *testing.T) {
err = <-errCh
assert.Equal(io.EOF, err)
}

func TestBufferConcurrentReadWrite(t *testing.T) {
defer test.TimeOut(time.Second * 5).Stop()

assert := assert.New(t)

buffer := NewBuffer()
packet := make([]byte, 4)

errCh := make(chan error, 4)
readIntoErr := func() {
_, readErr := buffer.Read(packet)
errCh <- readErr
}
writeIntoErr := func() {
_, writeErr := buffer.Write([]byte{2, 3, 4})
errCh <- writeErr
}
go readIntoErr()
go readIntoErr()
go writeIntoErr()
go writeIntoErr()

// Close
err := buffer.Close()
assert.NoError(err)

// we just care that the reads and writes happen
for i := 0; i < 4; i++ {
<-errCh
}
}

func TestBufferReadDeadlineInSyncCond(t *testing.T) {
defer test.TimeOut(time.Second * 10).Stop()

assert := assert.New(t)

buffer := NewBuffer()

assert.NoError(buffer.SetReadDeadline(time.Now().Add(5 * time.Second))) // Set deadline to avoid test deadlock

// Start up a goroutine to start a blocking read.
readErr := make(chan error)
go func() {
packet := make([]byte, 4)
_, err := buffer.Read(packet)
readErr <- err
}()

err := <-readErr
var e net.Error
if !errors.As(err, &e) || !e.Timeout() {
t.Errorf("Unexpected error: %v", err)
}

assert.NoError(buffer.Close())
}
16 changes: 15 additions & 1 deletion udp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"sync/atomic"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v3/deadline"
"github.com/pion/transport/v3/packetio"
"golang.org/x/net/ipv4"
Expand Down Expand Up @@ -51,6 +52,8 @@

readDoneCh chan struct{}
errRead atomic.Value // error

log logging.LeveledLogger
}

// Accept waits for and returns the next connection to the listener.
Expand Down Expand Up @@ -83,9 +86,13 @@
for {
select {
case c := <-l.acceptCh:
// don't call Close directly since it will deadlock; instead,
// manually clean up resources that Close normally would.
close(c.doneCh)
if closeErr := c.buffer.Close(); closeErr != nil {
l.log.Tracef("error closing unaccepted conn: %v", closeErr)

Check warning on line 93 in udp/conn.go

View check run for this annotation

Codecov / codecov/patch

udp/conn.go#L93

Added line #L93 was not covered by tests
}
delete(l.conns, c.rAddr.String())

default:
break lclose
}
Expand Down Expand Up @@ -183,6 +190,7 @@
acceptFilter: lc.AcceptFilter,
connWG: &sync.WaitGroup{},
readDoneCh: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("udp_listener"),
}

if lc.Batch.Enable {
Expand Down Expand Up @@ -285,6 +293,12 @@
case l.acceptCh <- conn:
l.conns[raddr.String()] = conn
default:
// don't call Close directly since it will deadlock; instead,
// manually clean up resources that Close normally would.
close(conn.doneCh)
if closeErr := conn.buffer.Close(); closeErr != nil {
l.log.Tracef("error closing unaccepted conn: %v", closeErr)

Check warning on line 300 in udp/conn.go

View check run for this annotation

Codecov / codecov/patch

udp/conn.go#L300

Added line #L300 was not covered by tests
}
return nil, false, ErrListenQueueExceeded
}
}
Expand Down
Loading