Skip to content

Commit

Permalink
Merge pull request #15520 from serathius/fix-issue15271-3.4
Browse files Browse the repository at this point in the history
[v3.4] Fix issue15271
  • Loading branch information
ahrtr authored Mar 20, 2023
2 parents 2eabc0b + 29ecfc0 commit 46ae7eb
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
5 changes: 4 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
// move slow watcher to victims
w.minRev = rev + 1
if victim == nil {
victim = make(watcherBatch)
}
Expand All @@ -468,6 +467,10 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
s.synced.delete(w)
slowWatcherGauge.Inc()
}
// always update minRev
// in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced
// in case 'send' returns false, this is needed for syncWatchers
w.minRev = rev + 1
}
s.addVictim(victim)
}
Expand Down
45 changes: 23 additions & 22 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,39 +307,40 @@ func TestWatchRestore(t *testing.T) {

testKey := []byte("foo")
testValue := []byte("bar")
rev := s.Put(testKey, testValue, lease.NoLease)

newBackend, newPath := backend.NewDefaultTmpBackend()
newStore := newWatchableStore(zap.NewExample(), newBackend, &lease.FakeLessor{}, nil, nil, StoreConfig{})
defer cleanup(newStore, newBackend, newPath)

w := newStore.NewWatchStream()
w.Watch(0, testKey, nil, rev-1)
w := s.NewWatchStream()
defer w.Close()
w.Watch(0, testKey, nil, 1)

time.Sleep(delay)
wantRev := s.Put(testKey, testValue, lease.NoLease)

newStore.Restore(b)
select {
case resp := <-w.Chan():
if resp.Revision != rev {
t.Fatalf("rev = %d, want %d", resp.Revision, rev)
}
if len(resp.Events) != 1 {
t.Fatalf("failed to get events from the response")
}
if resp.Events[0].Kv.ModRevision != rev {
t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, rev)
}
case <-time.After(time.Second):
t.Fatal("failed to receive event in 1 second.")
s.Restore(b)
events := readEventsForSecond(w.Chan())
if len(events) != 1 {
t.Errorf("Expected only one event, got %d", len(events))
}
if events[0].Kv.ModRevision != wantRev {
t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
}

}
}

t.Run("Normal", test(0))
t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
}

func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
for {
select {
case resp := <-ws:
events = append(events, resp.Events...)
case <-time.After(time.Second):
return events
}
}
}

// TestWatchRestoreSyncedWatcher tests such a case that:
// 1. watcher is created with a future revision "math.MaxInt64 - 2"
// 2. watcher with a future revision is added to "synced" watcher group
Expand Down

0 comments on commit 46ae7eb

Please sign in to comment.