From 1945eb6889b95099ac4c4934b888a508d741146d Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 22 Dec 2015 09:30:40 -0800 Subject: [PATCH] storage: range all unsynced at once This is for https://github.com/coreos/etcd/issues/3848 to batch RangeHistory for all watchings at once. --- storage/watchable_store.go | 71 +++++++++++++++++++++++++++++++++ storage/watchable_store_test.go | 31 ++++++++++++++ 2 files changed, 102 insertions(+) diff --git a/storage/watchable_store.go b/storage/watchable_store.go index ebad2a64de9f..11182caaeba6 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -17,6 +17,7 @@ package storage import ( "fmt" "log" + "math" "sync" "time" @@ -295,6 +296,76 @@ func (s *watchableStore) syncWatchings() { slowWatchingGauge.Set(float64(len(s.unsynced))) } +// RangeAllUnsynced ranges on all unsynced watchings. +func (s *watchableStore) RangeAllUnsynced() { + totalLimit := 0 + minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64) + revToKey := make(map[int64][]byte) + for w := range s.unsynced { + if w.cur > 0 && w.cur <= s.store.compactMainRev { + log.Printf("storage: %v", ErrCompacted) + delete(s.unsynced, w) + continue + } + if w.cur > s.store.currentRev.main { + log.Printf("storage: %v", ErrFutureRev) + delete(s.unsynced, w) + continue + } + totalLimit += cap(w.ch) - len(w.ch) + if minRev >= w.cur { + minRev = w.cur + } + if maxRev <= w.cur { + maxRev = w.cur + } + revToKey[w.cur] = w.key + } + + min := revToKey[minRev] + max := revToKey[maxRev] + + s.store.mu.Lock() + defer s.store.mu.Unlock() + + tx := s.b.BatchTx() + tx.Lock() + + keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0) + for i, key := range keys { + var kv storagepb.KeyValue + if err := kv.Unmarshal(vals[i]); err != nil { + log.Fatalf("storage: cannot unmarshal event: %v", err) + } + + rev := bytesToRev(key[:revBytesLen]) + + // restore index + switch { + case isTombstone(key): + s.store.kvindex.Tombstone(kv.Key, rev) + default: + s.store.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) + } + + // update revision + s.store.currentRev = rev + } + + tx.Unlock() + + fmt.Println("min:", string(min)) + fmt.Println("max:", string(max)) + fmt.Println("keys:", keys) + fmt.Println("vals:", vals) + /* + min: foo1 + max: foo9 + keys: [] + vals: [] + */ +} + // handle handles the change of the happening event on all watchings. func (s *watchableStore) handle(rev int64, ev storagepb.Event) { s.notify(rev, ev) diff --git a/storage/watchable_store_test.go b/storage/watchable_store_test.go index 9fe9600dc63c..9c86c4dc62e6 100644 --- a/storage/watchable_store_test.go +++ b/storage/watchable_store_test.go @@ -15,6 +15,7 @@ package storage import ( + "fmt" "os" "testing" ) @@ -185,6 +186,36 @@ func TestSyncWatchings(t *testing.T) { } } +// TestRangeAllUnsynced ... +func TestRangeAllUnsynced(t *testing.T) { + s := &watchableStore{ + store: newStore(tmpPath), + unsynced: make(map[*watching]struct{}), + synced: make(map[string]map[*watching]struct{}), + } + + defer func() { + s.store.Close() + os.Remove(tmpPath) + }() + + watcherN := 10 + + for i := 1; i < watcherN; i++ { + key := []byte(fmt.Sprintf("foo%d", i)) + val := []byte(fmt.Sprintf("bar%d", i)) + + s.Put(key, val) + + w := s.NewWatcher() + + // use non-0 to keep watchers in unsynced + w.Watch(key, false, int64(i)) + } + + s.RangeAllUnsynced() +} + func TestUnsafeAddWatching(t *testing.T) { s := newWatchableStore(tmpPath) defer func() {