Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: do lease pile-up reduction in the background #9699

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 66 additions & 46 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ type lessor struct {
stopC chan struct{}
// doneC is a channel whose closure indicates that the lessor is stopped.
doneC chan struct{}

// Ready is false when promotion is starting and gets
// set to true once lease processing is done to
// let findExpiredLeases start
Ready bool
}

func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
Expand Down Expand Up @@ -348,54 +353,69 @@ func (le *lessor) Leases() []*Lease {

func (le *lessor) Promote(extend time.Duration) {
le.mu.Lock()
defer le.mu.Unlock()

le.demotec = make(chan struct{})

// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}

if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
le.Ready = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs lock around le.Ready?

le.mu.Unlock()
go func() {
le.mu.Lock()
le.demotec = make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

le.demotec needs be protected by write lock.

leaseCopy := le.unsafeLeases()
le.mu.Unlock()
var updateList []*LeaseWithTime
defer func() {
le.mu.Lock()
defer le.mu.Unlock()
for _, item := range updateList {
heap.Push(&le.leaseHeap, item)
}
le.Ready = true
}()

// refresh the expiries of all leases.
for _, l := range leaseCopy {
l.refresh(extend)
// check that the lease hasn't been revoked
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}

// adjust expiries in case of overlap
leases := le.unsafeLeases()
sort.Sort(leasesByExpiry(leases))

baseWindow := leases[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
for _, l := range leases {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
if len(le.leaseMap) < leaseRevokeRate {
// no possibility of lease pile-up
return
}
expires++
if expires <= targetExpiresPerSecond {
continue

// adjust expiries in case of overlap
sort.Sort(leasesByExpiry(leaseCopy))

baseWindow := leaseCopy[0].Remaining()
nextWindow := baseWindow + time.Second
expires := 0
// have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4

for _, l := range leaseCopy {
remaining := l.Remaining()
if remaining > nextWindow {
baseWindow = remaining
nextWindow = baseWindow + time.Second
expires = 1
continue
}
expires++
if expires <= targetExpiresPerSecond {
continue
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
updateList = append(updateList, item)
}
rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
// If leases are extended by n seconds, leases n seconds ahead of the
// base window should be extended by only one second.
rateDelay -= float64(remaining - baseWindow)
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
}()
}

type leasesByExpiry []*Lease
Expand Down Expand Up @@ -497,7 +517,7 @@ func (le *lessor) runLoop() {
revokeLimit := leaseRevokeRate / 2

le.mu.RLock()
if le.isPrimary() {
if le.isPrimary() && le.Ready {
ls = le.findExpiredLeases(revokeLimit)
}
le.mu.RUnlock()
Expand Down
26 changes: 26 additions & 0 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package lease
import (
"os"
"testing"
"time"

"github.com/coreos/etcd/mvcc/backend"
)
Expand Down Expand Up @@ -53,6 +54,14 @@ func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b
func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) }
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }

func BenchmarkLessorPromote1(b *testing.B) { benchmarkLessorPromote(1, b) }
func BenchmarkLessorPromote10(b *testing.B) { benchmarkLessorPromote(10, b) }
func BenchmarkLessorPromote100(b *testing.B) { benchmarkLessorPromote(100, b) }
func BenchmarkLessorPromote1000(b *testing.B) { benchmarkLessorPromote(1000, b) }
func BenchmarkLessorPromote10000(b *testing.B) { benchmarkLessorPromote(10000, b) }
func BenchmarkLessorPromote100000(b *testing.B) { benchmarkLessorPromote(100000, b) }
func BenchmarkLessorPromote1000000(b *testing.B) { benchmarkLessorPromote(1000000, b) }

func benchmarkLessorFindExpired(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
Expand Down Expand Up @@ -115,6 +124,23 @@ func benchmarkLessorRenew(size int, b *testing.B) {
}
}

func benchmarkLessorPromote(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}

b.ResetTimer()

go func() { le.Promote(100 * time.Second) }()
for i := 0; i < b.N; i++ {
le.Grant(LeaseID(i+size), int64(100+i+size))
}
}

func cleanup(b backend.Backend, path string) {
b.Close()
os.Remove(path)
Expand Down
19 changes: 18 additions & 1 deletion lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"sort"
"sync"
"testing"
Expand All @@ -44,6 +45,7 @@ func TestLessorGrant(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)
waitForPromotion(le)

l, err := le.Grant(1, 1)
if err != nil {
Expand Down Expand Up @@ -205,7 +207,7 @@ func TestLessorRenew(t *testing.T) {
le := newLessor(be, minLeaseTTL)
defer le.Stop()
le.Promote(0)

waitForPromotion(le)
l, err := le.Grant(1, minLeaseTTL)
if err != nil {
t.Fatalf("failed to grant lease (%v)", err)
Expand Down Expand Up @@ -263,6 +265,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {

// extend after recovery should extend expiration on lease pile-up
le.Promote(0)
waitForPromotion(le)

windowCounts := make(map[int64]int)
for _, l := range le.leaseMap {
Expand Down Expand Up @@ -360,6 +363,7 @@ func TestLessorExpire(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -412,6 +416,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
defer le.Stop()

le.Promote(1 * time.Second)
waitForPromotion(le)
l, err := le.Grant(1, testMinTTL)
if err != nil {
t.Fatalf("failed to create lease: %v", err)
Expand Down Expand Up @@ -492,3 +497,15 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
func waitForPromotion(le *lessor) {
for {
le.mu.RLock()
ready := le.Ready
le.mu.RUnlock()
if ready {
return
} else {
runtime.Gosched()
}
}
}