diff --git a/client/pkg/testutil/recorder.go b/client/pkg/testutil/recorder.go index cc99914f609..388ff37249a 100644 --- a/client/pkg/testutil/recorder.go +++ b/client/pkg/testutil/recorder.go @@ -137,3 +137,38 @@ func newLenErr(expected int, actual int) error { s := fmt.Sprintf("len(actions) = %d, expected >= %d", actual, expected) return errors.New(s) } + +type recorderSync struct { + ch chan Action +} + +func NewRecorderSync() Recorder { + return &recorderSync{ch: make(chan Action)} +} + +func (r *recorderSync) Record(a Action) { + r.ch <- a +} + +func (r *recorderSync) Action() (acts []Action) { + for { + select { + case act := <-r.ch: + acts = append(acts, act) + default: + return acts + } + } +} + +func (r *recorderSync) Chan() <-chan Action { + return r.ch +} + +func (r *recorderSync) Wait(n int) ([]Action, error) { + acts := make([]Action, 0, n) + for i := 0; i < n; i++ { + acts = append(acts, <-r.ch) + } + return acts, nil +} diff --git a/server/etcdserver/api/v3compactor/periodic_test.go b/server/etcdserver/api/v3compactor/periodic_test.go index 5053482a807..26917f05e8a 100644 --- a/server/etcdserver/api/v3compactor/periodic_test.go +++ b/server/etcdserver/api/v3compactor/periodic_test.go @@ -27,26 +27,30 @@ import ( "go.etcd.io/etcd/client/pkg/v3/testutil" ) +func advanceForDuration(fc clockwork.FakeClock, rg *fakeRevGetter, duration, interval time.Duration) { + for i := 0; i < int(duration/interval); i++ { + // wait for periodic to call rev + _, _ = rg.Wait(1) + // Block until the periodic is waiting on the clock + fc.BlockUntil(1) + fc.Advance(interval) + } +} + func TestPeriodicHourly(t *testing.T) { retentionHours := 2 retentionDuration := time.Duration(retentionHours) * time.Hour fc := clockwork.NewFakeClock() - // TODO: Do not depand or real time (Recorder.Wait) in unit tests. - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} - compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + rg := &fakeRevGetter{testutil.NewRecorderSync(), 0} + compactable := &fakeCompactable{testutil.NewRecorderSync()} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() - initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 - // compaction doesn't happen til 2 hours elapse - for i := 0; i < initialIntervals; i++ { - rg.Wait(1) - fc.Advance(tb.getRetryInterval()) - } + advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval()) // very first compaction a, err := compactable.Wait(1) @@ -62,10 +66,7 @@ func TestPeriodicHourly(t *testing.T) { // now compactor kicks in, every hour for i := 0; i < 3; i++ { // advance one hour, one revision for each interval - for j := 0; j < intervalsPerPeriod; j++ { - rg.Wait(1) - fc.Advance(tb.getRetryInterval()) - } + advanceForDuration(fc, rg, time.Hour, tb.getRetryInterval()) a, err = compactable.Wait(1) if err != nil { @@ -84,20 +85,15 @@ func TestPeriodicMinutes(t *testing.T) { retentionDuration := time.Duration(retentionMinutes) * time.Minute fc := clockwork.NewFakeClock() - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} - compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + rg := &fakeRevGetter{testutil.NewRecorderSync(), 0} + compactable := &fakeCompactable{testutil.NewRecorderSync()} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() - initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 - // compaction doesn't happen til 5 minutes elapse - for i := 0; i < initialIntervals; i++ { - rg.Wait(1) - fc.Advance(tb.getRetryInterval()) - } + advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval()) // very first compaction a, err := compactable.Wait(1) @@ -112,10 +108,7 @@ func TestPeriodicMinutes(t *testing.T) { // compaction happens at every interval for i := 0; i < 5; i++ { // advance 5-minute, one revision for each interval - for j := 0; j < intervalsPerPeriod; j++ { - rg.Wait(1) - fc.Advance(tb.getRetryInterval()) - } + advanceForDuration(fc, rg, retentionDuration, tb.getRetryInterval()) a, err := compactable.Wait(1) if err != nil { @@ -132,21 +125,17 @@ func TestPeriodicMinutes(t *testing.T) { func TestPeriodicPause(t *testing.T) { fc := clockwork.NewFakeClock() retentionDuration := time.Hour - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} - compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} + rg := &fakeRevGetter{testutil.NewRecorderSync(), 0} + compactable := &fakeCompactable{testutil.NewRecorderSync()} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) tb.Run() tb.Pause() - n := tb.getRetentions() - + start := fc.Now() // tb will collect 3 hours of revisions but not compact since paused - for i := 0; i < n*3; i++ { - rg.Wait(1) - fc.Advance(tb.getRetryInterval()) - } - // t.revs = [21 22 23 24 25 26 27 28 29 30] + advanceForDuration(fc, rg, 3*time.Hour, tb.getRetryInterval()) + // t.revs = [20 21 22 23 24 25 26 27 28 29 30] select { case a := <-compactable.Chan(): @@ -156,11 +145,15 @@ func TestPeriodicPause(t *testing.T) { // tb resumes to being blocked on the clock tb.Resume() - rg.Wait(1) - + _, _ = rg.Wait(1) + fc.BlockUntil(1) // unblock clock, will kick off a compaction at T=3h6m by retry fc.Advance(tb.getRetryInterval()) + if elapsed := fc.Since(start); elapsed < (3*time.Hour + 6*time.Minute) { + t.Fatalf("expected time elapsed 3h6m, elapsed %v", elapsed) + } + // T=3h6m a, err := compactable.Wait(1) if err != nil { @@ -168,7 +161,7 @@ func TestPeriodicPause(t *testing.T) { } // compact the revision from hour 2:06 - wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)} + wreq := &pb.CompactionRequest{Revision: tb.revs[0]} if !reflect.DeepEqual(a[0].Params[0], wreq) { t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision) } @@ -179,20 +172,21 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { retentionDuration := time.Duration(retentionMinutes) * time.Minute fc := clockwork.NewFakeClock() - rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0} + rg := &fakeRevGetter{testutil.NewRecorderSync(), 0} compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)} tb := newPeriodic(zaptest.NewLogger(t), fc, retentionDuration, rg, compactable) tb.Run() defer tb.Stop() - initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10 + intervalsPerPeriod := int(retentionDuration / tb.getRetryInterval()) // first compaction happens til 5 minutes elapsed - for i := 0; i < initialIntervals; i++ { + for i := 0; i < intervalsPerPeriod; i++ { // every time set the same revision with 100 rg.SetRev(int64(100)) - rg.Wait(1) + _, _ = rg.Wait(1) + fc.BlockUntil(1) fc.Advance(tb.getRetryInterval()) } @@ -212,7 +206,8 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { for i := 0; i < 5; i++ { for j := 0; j < intervalsPerPeriod; j++ { rg.SetRev(int64(100)) - rg.Wait(1) + _, _ = rg.Wait(1) + fc.BlockUntil(1) fc.Advance(tb.getRetryInterval()) } @@ -223,8 +218,9 @@ func TestPeriodicSkipRevNotChange(t *testing.T) { } // when revision changed, compaction is normally - for i := 0; i < initialIntervals; i++ { - rg.Wait(1) + for i := 0; i < tb.getRetentions(); i++ { + _, _ = rg.Wait(1) + fc.BlockUntil(1) fc.Advance(tb.getRetryInterval()) }