diff --git a/compactor/periodic.go b/compactor/periodic.go index 09953528ef3..9d9164e9c5c 100644 --- a/compactor/periodic.go +++ b/compactor/periodic.go @@ -61,81 +61,85 @@ func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compact return t } +/* +Compaction period 1-hour: + 1. compute compaction period, which is 1-hour + 2. record revisions for every 1/10 of 1-hour (6-minute) + 3. keep recording revisions with no compaction for first 1-hour + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 1-hour (6-minute) + +Compaction period 24-hour: + 1. compute compaction period, which is 1-hour + 2. record revisions for every 1/10 of 1-hour (6-minute) + 3. keep recording revisions with no compaction for first 24-hour + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 1-hour (6-minute) + +Compaction period 59-min: + 1. compute compaction period, which is 59-min + 2. record revisions for every 1/10 of 59-min (5.9-min) + 3. keep recording revisions with no compaction for first 59-min + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 59-min (5.9-min) + +Compaction period 5-sec: + 1. compute compaction period, which is 5-sec + 2. record revisions for every 1/10 of 5-sec (0.5-sec) + 3. keep recording revisions with no compaction for first 5-sec + 4. do compact with revs[0] + - success? contiue on for-loop and move sliding window; revs = revs[1:] + - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec) +*/ + +// Run runs periodic compactor. func (t *Periodic) Run() { - fetchInterval := t.getFetchInterval() + compactInterval := t.getCompactInterval() retryInterval := t.getRetryInterval() - retentions := int(t.period/fetchInterval) + 1 // number of revs to keep for t.period - notify := make(chan struct{}, 1) + retentions := t.getRetentions() - // periodically updates t.revs and notify to the other goroutine go func() { + lastSuccess := t.clock.Now() + baseInterval := t.period for { - rev := t.rg.Rev() - t.mu.Lock() - t.revs = append(t.revs, rev) + t.revs = append(t.revs, t.rg.Rev()) if len(t.revs) > retentions { t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago } - t.mu.Unlock() - - select { - case notify <- struct{}{}: - default: - // compaction can take time more than interval - } select { case <-t.ctx.Done(): return - case <-t.clock.After(fetchInterval): - } - } - }() - - // run compaction triggered by the other goroutine thorough the notify channel - // or internal periodic retry - go func() { - var lastCompactedRev int64 - for { - select { - case <-t.ctx.Done(): - return - case <-notify: - // from the other goroutine case <-t.clock.After(retryInterval): - // for retry - // when t.rev is not updated, this event will be ignored later, - // so we don't need to think about race with <-notify. + t.mu.Lock() + p := t.paused + t.mu.Unlock() + if p { + continue + } } - t.mu.Lock() - p := t.paused - rev := t.revs[0] - len := len(t.revs) - t.mu.Unlock() - if p { + if t.clock.Now().Sub(lastSuccess) < baseInterval { continue } - // it's too early to start working - if len != retentions { - continue - } - - // if t.revs is not updated, we can ignore the event. - // it's not the first time to try comapction in this interval. - if rev == lastCompactedRev { - continue + // wait up to initial given period + if baseInterval == t.period { + baseInterval = compactInterval } + rev := t.revs[0] plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) if err == nil || err == mvcc.ErrCompacted { + lastSuccess = t.clock.Now() plog.Noticef("Finished auto-compaction at revision %d", rev) - lastCompactedRev = rev } else { plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) - plog.Noticef("Retry after %s", retryInterval) + plog.Noticef("Retry after %v", retryInterval) } } }() @@ -145,7 +149,7 @@ func (t *Periodic) Run() { // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) // if given compaction period x is >1-hour, compact every hour. // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) -func (t *Periodic) getFetchInterval() time.Duration { +func (t *Periodic) getCompactInterval() time.Duration { itv := t.period if itv > time.Hour { itv = time.Hour @@ -153,35 +157,33 @@ func (t *Periodic) getFetchInterval() time.Duration { return itv } +func (t *Periodic) getRetentions() int { + return int(t.period/t.getRetryInterval()) + 1 +} + const retryDivisor = 10 func (t *Periodic) getRetryInterval() time.Duration { - itv := t.period / retryDivisor - // we don't want to too aggressive retries - // and also jump between 6-minute through 60-minute - if itv < (6 * time.Minute) { // t.period is less than hour - // if t.period is less than 6-minute, - // retry interval is t.period. - // if we divide byretryDivisor, it's too aggressive - if t.period < 6*time.Minute { - itv = t.period - } else { - itv = 6 * time.Minute - } + itv := t.period + if itv > time.Hour { + itv = time.Hour } - return itv + return itv / retryDivisor } +// Stop stops periodic compactor. func (t *Periodic) Stop() { t.cancel() } +// Pause pauses periodic compactor. func (t *Periodic) Pause() { t.mu.Lock() defer t.mu.Unlock() t.paused = true } +// Resume resumes periodic compactor. func (t *Periodic) Resume() { t.mu.Lock() defer t.mu.Unlock() diff --git a/compactor/periodic_test.go b/compactor/periodic_test.go index d4ad5cd0306..21e539e765d 100644 --- a/compactor/periodic_test.go +++ b/compactor/periodic_test.go @@ -36,22 +36,40 @@ func TestPeriodicHourly(t *testing.T) { tb.Run() defer tb.Stop() - // simulate 5 hours - for i := 0; i < 5; i++ { + initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + + // compaction doesn't happen til 2 hours elapse + for i := 0; i < initialIntervals; i++ { rg.Wait(1) - fc.Advance(time.Hour) - // compaction doesn't happen til 2 hours elapses. - if i < retentionHours { - continue + fc.Advance(tb.getRetryInterval()) + } + + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // simulate 3 hours + // now compactor kicks in, every hour + for i := 0; i < 3; i++ { + // advance one hour, one revision for each interval + for j := 0; j < intervalsPerPeriod; j++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) } - // after 2 hours, compaction happens at every interval. - // at i = 3, t.revs = [1(2h-ago,T=0h), 2(1h-ago,T=1h), 3(now,T=2h)] (len=3) (rev starts from 1) - a, err := compactable.Wait(1) + + a, err = compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(i - 1) + + expectedRevision = int64((i + 1) * 10) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } @@ -59,7 +77,7 @@ func TestPeriodicHourly(t *testing.T) { } func TestPeriodicMinutes(t *testing.T) { - retentionMinutes := 23 + retentionMinutes := 5 retentionDuration := time.Duration(retentionMinutes) * time.Minute fc := clockwork.NewFakeClock() @@ -70,25 +88,41 @@ func TestPeriodicMinutes(t *testing.T) { tb.Run() defer tb.Stop() - // simulate 115 (23 * 5) minutes - for i := 0; i < 5; i++ { + initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + + // compaction doesn't happen til 5 minutes elapse + for i := 0; i < initialIntervals; i++ { rg.Wait(1) - fc.Advance(retentionDuration) + fc.Advance(tb.getRetryInterval()) + } - // notting happens at T=0 - if i == 0 { - continue + // very first compaction + a, err := compactable.Wait(1) + if err != nil { + t.Fatal(err) + } + expectedRevision := int64(1) + if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { + t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) + } + + // compaction happens at every interval + for i := 0; i < 5; i++ { + // advance 5-minute, one revision for each interval + for j := 0; j < intervalsPerPeriod; j++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) } - // from T=23m (i=1), compaction happens at every interval + a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - expectedRevision := int64(i) + + expectedRevision = int64((i + 1) * 10) if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) { t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) } - } } @@ -102,18 +136,14 @@ func TestPeriodicPause(t *testing.T) { tb.Run() tb.Pause() + n := tb.getRetentions() + // tb will collect 3 hours of revisions but not compact since paused - // T=0 - rg.Wait(1) // t.revs = [1] - fc.Advance(time.Hour) - // T=1h - rg.Wait(1) // t.revs = [1, 2] - fc.Advance(time.Hour) - // T=2h - rg.Wait(1) // t.revs = [2, 3] - fc.Advance(time.Hour) - // T=3h - rg.Wait(1) // t.revs = [3, 4] + for i := 0; i < n*3; i++ { + rg.Wait(1) + fc.Advance(tb.getRetryInterval()) + } + // t.revs = [21 22 23 24 25 26 27 28 29 30] select { case a := <-compactable.Chan(): @@ -123,16 +153,19 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() + rg.Wait(1) // unblock clock, will kick off a compaction at T=3h6m by retry - fc.Advance(time.Minute * 6) + fc.Advance(tb.getRetryInterval()) + // T=3h6m a, err := compactable.Wait(1) if err != nil { t.Fatal(err) } - // compact the revision from T=3h - wreq := &pb.CompactionRequest{Revision: int64(3)} + + // compact the revision from hour 2:06 + wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) }