Skip to content

Commit

Permalink
compactor: clean up
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Mar 22, 2018
1 parent f7714e2 commit ba7651e
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
27 changes: 18 additions & 9 deletions compactor/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,67 +46,76 @@ type Periodic struct {
// NewPeriodic creates a new instance of Periodic compactor that purges
// the log older than h Duration.
func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
return &Periodic{
t := &Periodic{
clock: clockwork.NewRealClock(),
period: h,
rg: rg,
c: c,
revs: make([]int64, 0),
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
}

// periodDivisor divides Periodic.period in into checkCompactInterval duration
const periodDivisor = 10

// Run runs periodic compactor.
func (t *Periodic) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
t.revs = make([]int64, 0)
clock := t.clock
checkCompactInterval := t.period / time.Duration(periodDivisor)
interval := t.period / time.Duration(periodDivisor)
go func() {
last := clock.Now()
initialWait := t.clock.Now()
for {
t.revs = append(t.revs, t.rg.Rev())
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactInterval):
case <-t.clock.After(interval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
if p {
continue
}
}
if clock.Now().Sub(last) < t.period {

// wait up to initial given period
if t.clock.Now().Sub(initialWait) < t.period {
continue
}

rev, remaining := t.getRev()
if rev < 0 {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
// move to next sliding window
t.revs = remaining
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactInterval)
plog.Noticef("Retry after %v", interval)
}
}
}()
}

// Stop stops periodic compactor.
func (t *Periodic) Stop() {
t.cancel()
}

// Pause pauses periodic compactor.
func (t *Periodic) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

// Resume resumes periodic compactor.
func (t *Periodic) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions compactor/periodic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
package compactor

import (
"context"
"reflect"
"testing"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/pkg/testutil"

"github.com/jonboulle/clockwork"
)

Expand All @@ -36,7 +38,9 @@ func TestPeriodic(t *testing.T) {
period: retentionDuration,
rg: rg,
c: compactable,
revs: make([]int64, 0),
}
tb.ctx, tb.cancel = context.WithCancel(context.Background())

tb.Run()
defer tb.Stop()
Expand Down Expand Up @@ -78,7 +82,9 @@ func TestPeriodicPause(t *testing.T) {
period: retentionDuration,
rg: rg,
c: compactable,
revs: make([]int64, 0),
}
tb.ctx, tb.cancel = context.WithCancel(context.Background())

tb.Run()
tb.Pause()
Expand Down
25 changes: 15 additions & 10 deletions compactor/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package compactor
import (
"context"
"sync"
"time"

pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/mvcc"
Expand All @@ -43,25 +44,27 @@ type Revision struct {
// NewRevision creates a new instance of Revisonal compactor that purges
// the log older than retention revisions from the current revision.
func NewRevision(retention int64, rg RevGetter, c Compactable) *Revision {
return &Revision{
t := &Revision{
clock: clockwork.NewRealClock(),
retention: retention,
rg: rg,
c: c,
}
t.ctx, t.cancel = context.WithCancel(context.Background())
return t
}

func (t *Revision) Run() {
t.ctx, t.cancel = context.WithCancel(context.Background())
clock := t.clock
previous := int64(0)
const revInterval = 5 * time.Minute

// Run runs revision-based compactor.
func (t *Revision) Run() {
prev := int64(0)
go func() {
for {
select {
case <-t.ctx.Done():
return
case <-clock.After(checkCompactionInterval):
case <-t.clock.After(revInterval):
t.mu.Lock()
p := t.paused
t.mu.Unlock()
Expand All @@ -71,34 +74,36 @@ func (t *Revision) Run() {
}

rev := t.rg.Rev() - t.retention

if rev <= 0 || rev == previous {
if rev <= 0 || rev == prev {
continue
}

plog.Noticef("Starting auto-compaction at revision %d (retention: %d revisions)", rev, t.retention)
_, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
if err == nil || err == mvcc.ErrCompacted {
previous = rev
prev = rev
plog.Noticef("Finished auto-compaction at revision %d", rev)
} else {
plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
plog.Noticef("Retry after %v", checkCompactionInterval)
plog.Noticef("Retry after %v", revInterval)
}
}
}()
}

// Stop stops revision-based compactor.
func (t *Revision) Stop() {
t.cancel()
}

// Pause pauses revision-based compactor.
func (t *Revision) Pause() {
t.mu.Lock()
defer t.mu.Unlock()
t.paused = true
}

// Resume resumes revision-based compactor.
func (t *Revision) Resume() {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down
15 changes: 9 additions & 6 deletions compactor/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package compactor

import (
"context"
"reflect"
"testing"
"time"
Expand All @@ -34,17 +35,18 @@ func TestRevision(t *testing.T) {
rg: rg,
c: compactable,
}
tb.ctx, tb.cancel = context.WithCancel(context.Background())

tb.Run()
defer tb.Stop()

fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
// nothing happens

rg.SetRev(99) // will be 100
expectedRevision := int64(90)
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
Expand All @@ -61,7 +63,7 @@ func TestRevision(t *testing.T) {

rg.SetRev(199) // will be 200
expectedRevision = int64(190)
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err = compactable.Wait(1)
if err != nil {
Expand All @@ -82,14 +84,15 @@ func TestRevisionPause(t *testing.T) {
rg: rg,
c: compactable,
}
tb.ctx, tb.cancel = context.WithCancel(context.Background())

tb.Run()
tb.Pause()

// tb will collect 3 hours of revisions but not compact since paused
n := int(time.Hour / checkCompactionInterval)
n := int(time.Hour / revInterval)
for i := 0; i < 3*n; i++ {
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
}
// tb ends up waiting for the clock

Expand All @@ -103,7 +106,7 @@ func TestRevisionPause(t *testing.T) {
tb.Resume()

// unblock clock, will kick off a compaction at hour 3:05
fc.Advance(checkCompactionInterval)
fc.Advance(revInterval)
rg.Wait(1)
a, err := compactable.Wait(1)
if err != nil {
Expand Down

0 comments on commit ba7651e

Please sign in to comment.