Skip to content

Commit

Permalink
use sync.Cond for packetio.Buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
paulwe committed Jul 23, 2024
1 parent 8754ef1 commit 76bde92
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 77 deletions.
111 changes: 79 additions & 32 deletions deadline/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,54 +21,54 @@ const (

var _ context.Context = (*Deadline)(nil)

// Deadline signals updatable deadline timer.
// Also, it implements context.Context.
type Deadline struct {
mu sync.RWMutex
timer timer
done chan struct{}
deadline time.Time
state deadlineState
pending uint8
// VerifyFunc checks that the deadline call is still valid ie. SetDeadline has
// not been called concurrently with the DeadlineFunc.
type VerifyFunc func() bool

// DeadlineFunc is called by Func when the scheduled deadline is reached. The
// VerifyFunc argument must be called exactly once and cannot be called
// concurrently with other calls to Func member functions.
type DeadlineFunc func(verify VerifyFunc)

Check warning on line 31 in deadline/deadline.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: type name will be used as deadline.DeadlineFunc by other packages, and that stutters; consider calling this Func (revive)

// Func is a utility for building deadline handlers. Func is not safe for
// concurrent access.
type Func struct {
f DeadlineFunc
timer timer
state deadlineState
pending uint8
}

// New creates new deadline timer.
func New() *Deadline {
return &Deadline{
done: make(chan struct{}),
func NewFunc(f DeadlineFunc) *Func {

Check warning on line 42 in deadline/deadline.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported function NewFunc should have comment or be unexported (revive)
return &Func{
f: f,
}
}

func (d *Deadline) timeout() {
d.mu.Lock()
func (d *Func) verify() bool {
if d.pending--; d.pending != 0 || d.state != deadlineStarted {
d.mu.Unlock()
return
return false

Check warning on line 50 in deadline/deadline.go

View check run for this annotation

Codecov / codecov/patch

deadline/deadline.go#L50

Added line #L50 was not covered by tests
}

d.state = deadlineExceeded
done := d.done
d.mu.Unlock()
return true
}

close(done)
func (d *Func) timeout() {
d.f(d.verify)
}

// Set new deadline. Zero value means no deadline.
func (d *Deadline) Set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()
// SetDeadline schedules the function to be called. If t is zero the function
// is unscheduled. The returned reset value is true when the previous deadline
// was exceeded. The returned exceeded value is true when t is in the past.
func (d *Func) SetDeadline(t time.Time) (reset, exceeded bool) {
reset = d.state == deadlineExceeded

if d.state == deadlineStarted && d.timer.Stop() {
d.pending--
}

d.deadline = t
d.pending++

if d.state == deadlineExceeded {
d.done = make(chan struct{})
}

if t.IsZero() {
d.pending--
d.state = deadlineStopped
Expand All @@ -87,7 +87,54 @@ func (d *Deadline) Set(t time.Time) {

d.pending--
d.state = deadlineExceeded
close(d.done)
return reset, true
}

// DeadlineExceeded returns true when the last set deadline has passed
func (d *Func) DeadlineExceeded() bool {
return d.state == deadlineExceeded
}

// Deadline signals updatable deadline timer.
// Also, it implements context.Context.
type Deadline struct {
mu sync.RWMutex
done chan struct{}
deadline time.Time
timer *Func
}

// New creates new deadline timer.
func New() *Deadline {
d := &Deadline{
done: make(chan struct{}),
}
d.timer = NewFunc(d.timeout)
return d
}

func (d *Deadline) timeout(verify VerifyFunc) {
d.mu.Lock()
defer d.mu.Unlock()
if verify() {
close(d.done)
}
}

// Set new deadline. Zero value means no deadline.
func (d *Deadline) Set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

d.deadline = t

reset, exceeded := d.timer.SetDeadline(t)
if reset {
d.done = make(chan struct{})
}
if exceeded {
close(d.done)
}
}

// Done receives deadline signal.
Expand All @@ -102,7 +149,7 @@ func (d *Deadline) Done() <-chan struct{} {
func (d *Deadline) Err() error {
d.mu.RLock()
defer d.mu.RUnlock()
if d.state == deadlineExceeded {
if d.timer.DeadlineExceeded() {
return context.DeadlineExceeded
}
return nil
Expand Down
93 changes: 48 additions & 45 deletions packetio/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
// Buffer allows writing packets to an intermediate buffer, which can then be read form.
// This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.
type Buffer struct {
mutex sync.Mutex
cond sync.Cond
readDeadline *deadline.Func

// this is a circular buffer. If head <= tail, then the useful
// data is in the interval [head, tail[. If tail < head, then
Expand All @@ -38,13 +39,10 @@ type Buffer struct {
data []byte
head, tail int

notify chan struct{}
closed bool

count int
limitCount, limitSize int

readDeadline *deadline.Deadline
}

const (
Expand All @@ -55,10 +53,11 @@ const (

// NewBuffer creates a new Buffer.
func NewBuffer() *Buffer {
return &Buffer{
notify: make(chan struct{}, 1),
readDeadline: deadline.New(),
b := &Buffer{
cond: sync.Cond{L: &sync.Mutex{}},
}
b.readDeadline = deadline.NewFunc(b.timeoutReadDeadline)
return b
}

// available returns true if the buffer is large enough to fit a packet
Expand Down Expand Up @@ -128,24 +127,24 @@ func (b *Buffer) Write(packet []byte) (int, error) {
return 0, errPacketTooBig
}

b.mutex.Lock()
b.cond.L.Lock()

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, io.ErrClosedPipe
}

if (b.limitCount > 0 && b.count >= b.limitCount) ||
(b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, ErrFull
}

// grow the buffer until the packet fits
for !b.available(len(packet)) {
err := b.grow()
if err != nil {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, err
}
}
Expand All @@ -172,11 +171,9 @@ func (b *Buffer) Write(packet []byte) (int, error) {
}
b.count++

select {
case b.notify <- struct{}{}:
default:
}
b.mutex.Unlock()
b.cond.L.Unlock()

b.cond.Signal()

return len(packet), nil
}
Expand All @@ -186,15 +183,12 @@ func (b *Buffer) Write(packet []byte) (int, error) {
// Returns io.ErrShortBuffer is the packet is too small to copy the Write.
// Returns io.EOF if the buffer is closed.
func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
// Return immediately if the deadline is already exceeded.
select {
case <-b.readDeadline.Done():
return 0, &netError{ErrTimeout, true, true}
default:
}

b.cond.L.Lock()
for {
b.mutex.Lock()
if b.readDeadline.DeadlineExceeded() {
b.cond.L.Unlock()
return 0, &netError{ErrTimeout, true, true}
}

if b.head != b.tail {
// decode the packet size
Expand Down Expand Up @@ -238,7 +232,7 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

b.count--
b.mutex.Unlock()
b.cond.L.Unlock()

if copied < count {
return copied, io.ErrShortBuffer
Expand All @@ -247,58 +241,53 @@ func (b *Buffer) Read(packet []byte) (n int, err error) { //nolint:gocognit
}

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return 0, io.EOF
}
b.mutex.Unlock()

select {
case <-b.readDeadline.Done():
return 0, &netError{ErrTimeout, true, true}
case <-b.notify:
}
b.cond.Wait()
}
}

// Close the buffer, unblocking any pending reads.
// Data in the buffer can still be read, Read will return io.EOF only when empty.
func (b *Buffer) Close() (err error) {
b.mutex.Lock()
b.cond.L.Lock()

if b.closed {
b.mutex.Unlock()
b.cond.L.Unlock()
return nil
}

b.closed = true
close(b.notify)
b.mutex.Unlock()
b.cond.Broadcast()
b.cond.L.Unlock()

return nil
}

// Count returns the number of packets in the buffer.
func (b *Buffer) Count() int {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()
return b.count
}

// SetLimitCount controls the maximum number of packets that can be buffered.
// Causes Write to return ErrFull when this limit is reached.
// A zero value will disable this limit.
func (b *Buffer) SetLimitCount(limit int) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

b.limitCount = limit
}

// Size returns the total byte size of packets in the buffer, including
// a small amount of administrative overhead.
func (b *Buffer) Size() int {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

return b.size()
}
Expand All @@ -319,15 +308,29 @@ func (b *Buffer) size() int {
// When packetioSizeHardLimit build tag is set, SetLimitSize exceeding
// the hard limit will be silently discarded.
func (b *Buffer) SetLimitSize(limit int) {
b.mutex.Lock()
defer b.mutex.Unlock()
b.cond.L.Lock()
defer b.cond.L.Unlock()

b.limitSize = limit
}

func (b *Buffer) timeoutReadDeadline(verify deadline.VerifyFunc) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if verify() {
b.cond.Broadcast()
}
}

// SetReadDeadline sets the deadline for the Read operation.
// Setting to zero means no deadline.
func (b *Buffer) SetReadDeadline(t time.Time) error {
b.readDeadline.Set(t)
b.cond.L.Lock()
defer b.cond.L.Unlock()

if _, timeout := b.readDeadline.SetDeadline(t); timeout {
b.cond.Broadcast()
}

return nil
}
10 changes: 10 additions & 0 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ func TestBuffer(t *testing.T) {
}
assert.Equal(0, n)

// Future deadline
err = buffer.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
assert.NoError(err)
time.Sleep(200 * time.Millisecond)
n, err = buffer.Read(packet)
if !errors.As(err, &e) || !e.Timeout() {
t.Errorf("Unexpected error: %v", err)
}
assert.Equal(0, n)

// Reset deadline
err = buffer.SetReadDeadline(time.Time{})
assert.NoError(err)
Expand Down

0 comments on commit 76bde92

Please sign in to comment.