Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse timer in Deadline #290

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 52 additions & 45 deletions deadline/deadline.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,94 +11,101 @@
"time"
)

type deadlineState uint8

const (
deadlineStopped deadlineState = iota
deadlineStarted
deadlineExceeded
)

var _ context.Context = (*Deadline)(nil)

// Deadline signals updatable deadline timer.
// Also, it implements context.Context.
type Deadline struct {
exceeded chan struct{}
stop chan struct{}
stopped chan bool
deadline time.Time
mu sync.RWMutex
timer timer
done chan struct{}
deadline time.Time
state deadlineState
pending uint8
}

// New creates new deadline timer.
func New() *Deadline {
d := &Deadline{
exceeded: make(chan struct{}),
stop: make(chan struct{}),
stopped: make(chan bool, 1),
return &Deadline{
done: make(chan struct{}),
}
d.stopped <- true
return d
}

func (d *Deadline) timeout() {
d.mu.Lock()
if d.pending--; d.pending != 0 || d.state != deadlineStarted {
d.mu.Unlock()
return

Check warning on line 46 in deadline/deadline.go

View check run for this annotation

Codecov / codecov/patch

deadline/deadline.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

d.state = deadlineExceeded
done := d.done
d.mu.Unlock()

close(done)
}

// Set new deadline. Zero value means no deadline.
func (d *Deadline) Set(t time.Time) {
d.mu.Lock()
defer d.mu.Unlock()

d.deadline = t
if d.state == deadlineStarted && d.timer.Stop() {
d.pending--
}

close(d.stop)
d.deadline = t
d.pending++

select {
case <-d.exceeded:
d.exceeded = make(chan struct{})
default:
stopped := <-d.stopped
if !stopped {
d.exceeded = make(chan struct{})
}
if d.state == deadlineExceeded {
d.done = make(chan struct{})
}
d.stop = make(chan struct{})
d.stopped = make(chan bool, 1)

if t.IsZero() {
d.stopped <- true
d.pending--
d.state = deadlineStopped
return
}

if dur := time.Until(t); dur > 0 {
exceeded := d.exceeded
stopped := d.stopped
go func() {
timer := time.NewTimer(dur)
select {
case <-timer.C:
close(exceeded)
stopped <- false
case <-d.stop:
if !timer.Stop() {
<-timer.C
}
stopped <- true
}
}()
d.state = deadlineStarted
if d.timer == nil {
d.timer = afterFunc(dur, d.timeout)
} else {
d.timer.Reset(dur)
}
return
}

close(d.exceeded)
d.stopped <- false
d.pending--
d.state = deadlineExceeded
close(d.done)
}

// Done receives deadline signal.
func (d *Deadline) Done() <-chan struct{} {
d.mu.RLock()
defer d.mu.RUnlock()
return d.exceeded
return d.done
}

// Err returns context.DeadlineExceeded if the deadline is exceeded.
// Otherwise, it returns nil.
func (d *Deadline) Err() error {
d.mu.RLock()
defer d.mu.RUnlock()
select {
case <-d.exceeded:
if d.state == deadlineExceeded {
return context.DeadlineExceeded
default:
return nil
}
return nil
}

// Deadline returns current deadline.
Expand Down
10 changes: 10 additions & 0 deletions deadline/deadline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,13 @@ func TestContext(t *testing.T) {
}
})
}

func BenchmarkDeadline(b *testing.B) {
b.Run("Set", func(b *testing.B) {
d := New()
t := time.Now().Add(time.Minute)
for i := 0; i < b.N; i++ {
d.Set(t)
}
})
}
13 changes: 13 additions & 0 deletions deadline/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package deadline

import (
"time"
)

type timer interface {
Stop() bool
Reset(time.Duration) bool
}
15 changes: 15 additions & 0 deletions deadline/timer_generic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build !js
// +build !js

package deadline

import (
"time"
)

func afterFunc(d time.Duration, f func()) timer {
return time.AfterFunc(d, f)
}
67 changes: 67 additions & 0 deletions deadline/timer_js.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build js
// +build js

package deadline

import (
"sync"
"time"
)

// jsTimer is a timer utility for wasm with a working Reset function.
type jsTimer struct {
f func()
mu sync.Mutex
timer *time.Timer
version uint64
started bool
}

func afterFunc(d time.Duration, f func()) timer {
t := &jsTimer{f: f}
t.Reset(d)
return t
}

func (t *jsTimer) Stop() bool {
t.mu.Lock()
defer t.mu.Unlock()

t.version++
t.timer.Stop()

started := t.started
t.started = false
return started
}

func (t *jsTimer) Reset(d time.Duration) bool {
t.mu.Lock()
defer t.mu.Unlock()

if t.timer != nil {
t.timer.Stop()
}

t.version++
version := t.version
t.timer = time.AfterFunc(d, func() {
t.mu.Lock()
if version != t.version {
t.mu.Unlock()
return

Check warning on line 55 in deadline/timer_js.go

View check run for this annotation

Codecov / codecov/patch

deadline/timer_js.go#L54-L55

Added lines #L54 - L55 were not covered by tests
}

t.started = false
t.mu.Unlock()

t.f()
})

started := t.started
t.started = true
return started
}
14 changes: 7 additions & 7 deletions packetio/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes", test(w, 100, 13))
t.Run("200 writes", test(w, 200, 17))
t.Run("400 writes", test(w, 400, 19))
t.Run("1000 writes", test(w, 1000, 23))
t.Run("100 writes", test(w, 100, 10))
t.Run("200 writes", test(w, 200, 14))
t.Run("400 writes", test(w, 400, 16))
t.Run("1000 writes", test(w, 1000, 20))

wr := func(count int) func() {
return func() {
Expand All @@ -451,9 +451,9 @@ func TestBufferAlloc(t *testing.T) {
}
}

t.Run("100 writes and reads", test(wr, 100, 7))
t.Run("1000 writes and reads", test(wr, 1000, 7))
t.Run("10000 writes and reads", test(wr, 10000, 7))
t.Run("100 writes and reads", test(wr, 100, 4))
t.Run("1000 writes and reads", test(wr, 1000, 4))
t.Run("10000 writes and reads", test(wr, 10000, 4))
}

func benchmarkBufferWR(b *testing.B, size int64, write bool, grow int) { // nolint:unparam
Expand Down
Loading