From b4d9703385f662081063b8f1ee423cbfd53a76ca Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 21 Mar 2018 19:52:44 -0700 Subject: [PATCH] compactor: adjust interval for period <1-hour Signed-off-by: Gyuho Lee --- compactor/periodic.go | 119 +++++++++++++++++++++++++------------ compactor/periodic_test.go | 114 ++++++++++++++++++++--------------- 2 files changed, 147 insertions(+), 86 deletions(-) diff --git a/compactor/periodic.go b/compactor/periodic.go index 9e83191faf49..1ecd72c11e4f 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -61,50 +61,99 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact return t } -// periodDivisor divides Periodic.period in into checkCompactInterval duration -const periodDivisor = 10 - // Run runs periodic compactor. func (t *Periodic) Run() { - interval := t.period / time.Duration(periodDivisor) - go func() { - initialWait := t.clock.Now() - for { - t.revs = append(t.revs, t.rg.Rev()) - select { - case <-t.ctx.Done(): - return - case <-t.clock.After(interval): - t.mu.Lock() - p := t.paused - t.mu.Unlock() - if p { - continue - } - } + go t.run() +} - // wait up to initial given period - if t.clock.Now().Sub(initialWait) < t.period { - continue - } +// periodically fetches revisions and ensures that +// first element is always up-to-date for retention window +func (t *Periodic) run() { + initialWait := t.clock.Now() + fetchInterval := t.getInterval() + retryInterval := t.getRetryInterval() + + // e.g. period 9h with compaction period 1h, then retain up-to 9 revs + // e.g. period 12h with compaction period 1h, then retain up-to 12 revs + // e.g. period 20m with compaction period 20m, then retain up-to 1 rev + retentions := int(t.period / fetchInterval) + for { + t.revs = append(t.revs, t.rg.Rev()) + if len(t.revs) > retentions { + t.revs = t.revs[1:] + } - rev, remaining := t.getRev() - if rev < 0 { + select { + case <-t.ctx.Done(): + return + case <-t.clock.After(fetchInterval): + t.mu.RLock() + p := t.paused + t.mu.RUnlock() + if p { continue } + } + + // no compaction until initial wait period + if t.clock.Now().Sub(initialWait) < t.period { + continue + } + rev := t.revs[0] + for i := 0; i < retryDivisor; i++ { plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { - // move to next sliding window - t.revs = remaining + // compactor succeeds at revs[0], move sliding window + t.revs = t.revs[1:] plog.Noticef("Finished auto-compaction at revision %d", rev) - } else { - plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %v", interval) + break + } + + // compactor fails at revs[0]: + // 1. retry revs[0], so long as revs[0] is up-to-date + // 2. retry revs[1], when revs[0] becomes stale + plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) + plog.Noticef("Retry after %v", retryInterval) + paused := false + select { + case <-t.ctx.Done(): + return + case <-t.clock.After(retryInterval): + t.mu.RLock() + paused = t.paused + t.mu.RUnlock() + } + if paused { + break } } - }() + } +} + +// If given compaction period x is <1-hour, compact every x duration, with x retention window +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute). +// If given compaction period x is >1-hour, compact every 1-hour, with x retention window +// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='72h', then compact every 1-hour). +func (t *Periodic) getInterval() time.Duration { + itv := t.period + if itv > time.Hour { + itv = time.Hour + } + return itv +} + +const retryDivisor = 10 + +// divide by 10 to retry faster +// e.g. given period 2-hour, retry in 12-min rather than 1-hour (compaction period) +func (t *Periodic) getRetryInterval() time.Duration { + itv := t.period + if itv > time.Hour { + itv /= retryDivisor + } + return itv } // Stop stops periodic compactor. @@ -125,11 +174,3 @@ func (t *Periodic) Resume() { defer t.mu.Unlock() t.paused = false } - -func (t *Periodic) getRev() (int64, []int64) { - i := len(t.revs) - periodDivisor - if i < 0 { - return -1, t.revs - } - return t.revs[i], t.revs[i+1:] -} diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index f039a8a7617d..cd2898520229 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -25,83 +25,103 @@ import ( "github.com/jonboulle/clockwork" ) -func TestPeriodic(t *testing.T) { - retentionHours := 2 - retentionDuration := time.Duration(retentionHours) * time.Hour +func TestPeriodicHourly(t *testing.T) { + fc := clockwork.NewFakeClock() + rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} + compactable := &fakeCompactable{testutil.NewRecorderStream()} + tb := newPeriodic(fc, 12*time.Hour, rg, compactable) + + tb.Run() + defer tb.Stop() + + for i := 0; i < 24; i++ { + // first 12-hour only with rev gets + if _, err := rg.Wait(1); err != nil { + t.Fatal(err) + } + fc.Advance(tb.getInterval()) + + // after 12-hour, periodic compact begins, every hour + // with 12-hour retention window + if i >= 11 { + ca, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(i + 2 - int(tb.period/time.Hour)) + if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Fatalf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + } + } +} +func TestPeriodicEveryMinute(t *testing.T) { fc := clockwork.NewFakeClock() rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(fc, time.Minute, rg, compactable) tb.Run() defer tb.Stop() - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - // simulate 5 hours worth of intervals. - for i := 0; i < n/retentionHours*5; i++ { - rg.Wait(1) - fc.Advance(checkCompactInterval) - // compaction doesn't happen til 2 hours elapses. - if i < n { - continue + + // expect compact every minute + for i := 0; i < 10; i++ { + if _, err := rg.Wait(1); err != nil { + t.Fatal(err) } - // after 2 hours, compaction happens at every checkCompactInterval. - a, err := compactable.Wait(1) + fc.Advance(time.Minute) + + ca, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(i + 1 - n) - if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { - t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + expectedRevision := int64(i + 1) + if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } } - - // unblock the rev getter, so we can stop the compactor routine. - _, err := rg.Wait(1) - if err != nil { - t.Fatal(err) - } } -func TestPeriodicPause(t *testing.T) { +func TestPeriodicPauseHourly(t *testing.T) { fc := clockwork.NewFakeClock() - retentionDuration := time.Hour rg := &fakeRevGetter{testutil.NewRecorderStream(), 0} compactable := &fakeCompactable{testutil.NewRecorderStream()} - tb := newPeriodic(fc, retentionDuration, rg, compactable) + tb := newPeriodic(fc, 15*time.Hour, rg, compactable) tb.Run() + defer tb.Stop() + tb.Pause() - // tb will collect 3 hours of revisions but not compact since paused - checkCompactInterval := retentionDuration / time.Duration(periodDivisor) - n := periodDivisor - for i := 0; i < 3*n; i++ { - rg.Wait(1) - fc.Advance(checkCompactInterval) + // collect 15*2 hours of revisions with no compaction + for i := 0; i < 15*2; i++ { + if _, err := rg.Wait(1); err != nil { + t.Fatal(err) + } + fc.Advance(tb.getInterval()) } - // tb ends up waiting for the clock - select { case a := <-compactable.Chan(): t.Fatalf("unexpected action %v", a) case <-time.After(10 * time.Millisecond): } - // tb resumes to being blocked on the clock tb.Resume() - // unblock clock, will kick off a compaction at hour 3:06 - rg.Wait(1) - fc.Advance(checkCompactInterval) - a, err := compactable.Wait(1) - if err != nil { - t.Fatal(err) - } - // compact the revision from hour 2:06 - wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} - if !reflect.DeepEqual(a[0].Params[0], wreq) { - t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) + for i := 0; i < 20; i++ { + if _, err := rg.Wait(1); err != nil { + t.Fatal(err) + } + fc.Advance(tb.getInterval()) + + ca, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(i + 17) + if !reflect.DeepEqual(ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", ca[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } } }