diff --git a/compactor/compactor.go b/compactor/compactor.go index 5a83d13f8333..c057225174cc 100644 --- a/compactor/compactor.go +++ b/compactor/compactor.go @@ -29,8 +29,7 @@ var ( ) const ( - checkCompactionInterval = 5 * time.Minute - executeCompactionInterval = time.Hour + checkCompactionInterval = 5 * time.Minute ModePeriodic = "periodic" ModeRevision = "revision" @@ -57,7 +56,7 @@ type RevGetter interface { Rev() int64 } -func New(mode string, retention int, rg RevGetter, c Compactable) (Compactor, error) { +func New(mode string, retention time.Duration, rg RevGetter, c Compactable) (Compactor, error) { switch mode { case ModePeriodic: return NewPeriodic(retention, rg, c), nil diff --git a/compactor/periodic.go b/compactor/periodic.go index 784cef7c1663..447352ec3beb 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -26,10 +26,10 @@ import ( ) // Periodic compacts the log by purging revisions older than -// the configured retention time. Compaction happens hourly. +// the configured retention time. type Periodic struct { - clock clockwork.Clock - periodInHour int + clock clockwork.Clock + period time.Duration rg RevGetter c Compactable @@ -38,26 +38,30 @@ type Periodic struct { ctx context.Context cancel context.CancelFunc - mu sync.Mutex + // mu protects paused + mu sync.RWMutex paused bool } // NewPeriodic creates a new instance of Periodic compactor that purges -// the log older than h hours. -func NewPeriodic(h int, rg RevGetter, c Compactable) *Periodic { +// the log older than h Duration. +func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { return &Periodic{ - clock: clockwork.NewRealClock(), - periodInHour: h, - rg: rg, - c: c, + clock: clockwork.NewRealClock(), + period: h, + rg: rg, + c: c, } } +// periodDivisor divides Periodic.period in into checkCompactInterval duration +const periodDivisor = 10 + func (t *Periodic) Run() { t.ctx, t.cancel = context.WithCancel(context.Background()) t.revs = make([]int64, 0) clock := t.clock - + checkCompactInterval := t.period / time.Duration(periodDivisor) go func() { last := clock.Now() for { @@ -65,7 +69,7 @@ func (t *Periodic) Run() { select { case <-t.ctx.Done(): return - case <-clock.After(checkCompactionInterval): + case <-clock.After(checkCompactInterval): t.mu.Lock() p := t.paused t.mu.Unlock() @@ -73,25 +77,21 @@ func (t *Periodic) Run() { continue } } - - if clock.Now().Sub(last) < executeCompactionInterval { + if clock.Now().Sub(last) < t.period { continue } - - rev, remaining := t.getRev(t.periodInHour) + rev, remaining := t.getRev() if rev < 0 { continue } - - plog.Noticef("Starting auto-compaction at revision %d (retention: %d hours)", rev, t.periodInHour) + plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { t.revs = remaining - last = clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", err, rev) - plog.Noticef("Retry after %v", checkCompactionInterval) + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", checkCompactInterval) } } }() @@ -113,8 +113,8 @@ func (t *Periodic) Resume() { t.paused = false } -func (t *Periodic) getRev(h int) (int64, []int64) { - i := len(t.revs) - int(time.Duration(h)*time.Hour/checkCompactionInterval) +func (t *Periodic) getRev() (int64, []int64) { + i := len(t.revs) - periodDivisor if i < 0 { return -1, t.revs } diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index d0bb7f6eef3c..19abd4fdbc59 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -26,39 +26,36 @@ import ( func TestPeriodic(t *testing.T) { retentionHours := 2 + retentionDuration := time.Duration(retentionHours) * time.Hour fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} tb := &Periodic{ - clock: fc, - periodInHour: retentionHours, - rg: rg, - c: compactable, + clock: fc, + period: retentionDuration, + rg: rg, + c: compactable, } tb.Run() defer tb.Stop() - - n := int(time.Hour / checkCompactionInterval) - // collect 5 hours of revisions - for i := 0; i < 5; i++ { - // advance one hour, one revision for each interval - for j := 0; j < n; j++ { - rg.Wait(1) - fc.Advance(checkCompactionInterval) - } - - // compaction doesn't happen til 2 hours elapses - if i+1 < retentionHours { + checkCompactInterval := retentionDuration / time.Duration(periodDivisor) + n := periodDivisor + // simulate 5 hours worth of intervals. + for i := 0; i < n/retentionHours*5; i++ { + rg.Wait(1) + fc.Advance(checkCompactInterval) + // compaction doesn't happen til 2 hours elapses. + if i < n { continue } - + // after 2 hours, compaction happens at every checkCompactInterval. a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(1 + (i+1)*n - retentionHours*n) + expectedRevision := int64(i + 1 - n) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } @@ -75,21 +72,23 @@ func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() compactable := &fakeCompactable{testutil.NewRecorderStream()} rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + retentionDuration := time.Hour tb := &Periodic{ - clock: fc, - periodInHour: 1, - rg: rg, - c: compactable, + clock: fc, + period: retentionDuration, + rg: rg, + c: compactable, } tb.Run() tb.Pause() // tb will collect 3 hours of revisions but not compact since paused - n := int(time.Hour / checkCompactionInterval) + checkCompactInterval := retentionDuration / time.Duration(periodDivisor) + n := periodDivisor for i := 0; i < 3*n; i++ { rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(checkCompactInterval) } // tb ends up waiting for the clock @@ -102,14 +101,14 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3:05 + // unblock clock, will kick off a compaction at hour 3:06 rg.Wait(1) - fc.Advance(checkCompactionInterval) + fc.Advance(checkCompactInterval) a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from hour 2:05 + // compact the revision from hour 2:06 wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)