Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compactor: clean up #9476

Merged
merged 1 commit into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ var (
)

const (
checkCompactionInterval = 5 * time.Minute

ModePeriodic = "periodic"
ModeRevision = "revision"
)
Expand Down
33 changes: 23 additions & 10 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,67 +46,80 @@ type Periodic struct {
// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
clock: clockwork.NewRealClock(),
return newPeriodic(clockwork.NewRealClock(), h, rg, c)
}

func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
t := &Periodic{
clock: clock,
period: h,
rg: rg,
c: c,
revs: make([]int64, 0),
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

// Run runs periodic compactor.
func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock
checkCompactInterval := t.period / time.Duration(periodDivisor)
interval := t.period / time.Duration(periodDivisor)
go func() {
last := clock.Now()
initialWait := t.clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactInterval):
case <-t.clock.After(interval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}
if clock.Now().Sub(last) < t.period {

// wait up to initial given period
if t.clock.Now().Sub(initialWait) < t.period {
continue
}

rev, remaining := t.getRev()
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
// move to next sliding window
t.revs = remaining
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactInterval)
plog.Noticef("Retry after %v", interval)
}
}
}()
}

// Stop stops periodic compactor.
func (t *Periodic) Stop() {
t.cancel()
}

// Pause pauses periodic compactor.
func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

// Resume resumes periodic compactor.
func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
19 changes: 5 additions & 14 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"

"github.com/jonboulle/clockwork"
)

Expand All @@ -31,12 +32,7 @@ func TestPeriodic(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Periodic{
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}
tb := newPeriodic(fc, retentionDuration, rg, compactable)

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -70,15 +66,10 @@ func TestPeriodic(t *testing.T) {

func TestPeriodicPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
retentionDuration := time.Hour
tb := &Periodic{
clock: fc,
period: retentionDuration,
rg: rg,
c: compactable,
}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newPeriodic(fc, retentionDuration, rg, compactable)

tb.Run()
tb.Pause()
Expand Down
31 changes: 20 additions & 11 deletions compactor/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package compactor
import (
"context"
"sync"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
Expand All @@ -43,25 +44,31 @@ type Revision struct {
// NewRevision creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
return &Revision{
clock: clockwork.NewRealClock(),
return newRevision(clockwork.NewRealClock(), retention, rg, c)
}

func newRevision(clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
t := &Revision{
clock: clock,
retention: retention,
rg: rg,
c: c,
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
}

func (t *Revision) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
clock := t.clock
previous := int64(0)
const revInterval = 5 * time.Minute

// Run runs revision-based compactor.
func (t *Revision) Run() {
prev := int64(0)
go func() {
for {
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
case <-t.clock.After(revInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
Expand All @@ -71,34 +78,36 @@ func (t *Revision) Run() {
}

rev := t.rg.Rev() - t.retention

if rev <= 0 || rev == previous {
if rev <= 0 || rev == prev {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
previous = rev
prev = rev
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Retry after %v", revInterval)
}
}
}()
}

// Stop stops revision-based compactor.
func (t *Revision) Stop() {
t.cancel()
}

// Pause pauses revision-based compactor.
func (t *Revision) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

// Resume resumes revision-based compactor.
func (t *Revision) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
29 changes: 10 additions & 19 deletions compactor/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,26 @@ import (

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"

"github.com/jonboulle/clockwork"
)

func TestRevision(t *testing.T) {
fc := clockwork.NewFakeClock()
rg := &fakeRevGetter{testutil.NewRecorderStream(), 0}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := &Revision{
clock: fc,
retention: 10,
rg: rg,
c: compactable,
}
tb := newRevision(fc, 10, rg, compactable)

tb.Run()
defer tb.Stop()

fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
// nothing happens

rg.SetRev(99) // will be 100
expectedRevision := int64(90)
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
Expand All @@ -61,7 +57,7 @@ func TestRevision(t *testing.T) {

rg.SetRev(199) // will be 200
expectedRevision = int64(190)
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err = compactable.Wait(1)
if err != nil {
Expand All @@ -74,22 +70,17 @@ func TestRevision(t *testing.T) {

func TestRevisionPause(t *testing.T) {
fc := clockwork.NewFakeClock()
compactable := &fakeCompactable{testutil.NewRecorderStream()}
rg := &fakeRevGetter{testutil.NewRecorderStream(), 99} // will be 100
tb := &Revision{
clock: fc,
retention: 10,
rg: rg,
c: compactable,
}
compactable := &fakeCompactable{testutil.NewRecorderStream()}
tb := newRevision(fc, 10, rg, compactable)

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
n := int(time.Hour / revInterval)
for i := 0; i < 3*n; i++ {
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
}
// tb ends up waiting for the clock

Expand All @@ -103,7 +94,7 @@ func TestRevisionPause(t *testing.T) {
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
Expand Down