Skip to content

Commit

Permalink
storage: range all unsynced at once
Browse files Browse the repository at this point in the history
This is for #3848
to batch RangeHistory for all watchings at once.
  • Loading branch information
gyuho committed Dec 23, 2015
1 parent b79dae2 commit 1945eb6
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 0 deletions.
71 changes: 71 additions & 0 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -295,6 +296,76 @@ func (s *watchableStore) syncWatchings() {
slowWatchingGauge.Set(float64(len(s.unsynced)))
}

// RangeAllUnsynced ranges on all unsynced watchings.
func (s *watchableStore) RangeAllUnsynced() {
totalLimit := 0
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)
revToKey := make(map[int64][]byte)
for w := range s.unsynced {
if w.cur > 0 && w.cur <= s.store.compactMainRev {
log.Printf("storage: %v", ErrCompacted)
delete(s.unsynced, w)
continue
}
if w.cur > s.store.currentRev.main {
log.Printf("storage: %v", ErrFutureRev)
delete(s.unsynced, w)
continue
}
totalLimit += cap(w.ch) - len(w.ch)
if minRev >= w.cur {
minRev = w.cur
}
if maxRev <= w.cur {
maxRev = w.cur
}
revToKey[w.cur] = w.key
}

min := revToKey[minRev]
max := revToKey[maxRev]

s.store.mu.Lock()
defer s.store.mu.Unlock()

tx := s.b.BatchTx()
tx.Lock()

keys, vals := tx.UnsafeRange(keyBucketName, min, max, 0)
for i, key := range keys {
var kv storagepb.KeyValue
if err := kv.Unmarshal(vals[i]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}

rev := bytesToRev(key[:revBytesLen])

// restore index
switch {
case isTombstone(key):
s.store.kvindex.Tombstone(kv.Key, rev)
default:
s.store.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
}

// update revision
s.store.currentRev = rev
}

tx.Unlock()

fmt.Println("min:", string(min))
fmt.Println("max:", string(max))
fmt.Println("keys:", keys)
fmt.Println("vals:", vals)
/*
min: foo1
max: foo9
keys: []
vals: []
*/
}

// handle handles the change of the happening event on all watchings.
func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
s.notify(rev, ev)
Expand Down
31 changes: 31 additions & 0 deletions storage/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"fmt"
"os"
"testing"
)
Expand Down Expand Up @@ -185,6 +186,36 @@ func TestSyncWatchings(t *testing.T) {
}
}

// TestRangeAllUnsynced ...
func TestRangeAllUnsynced(t *testing.T) {
s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watching]struct{}),
synced: make(map[string]map[*watching]struct{}),
}

defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

watcherN := 10

for i := 1; i < watcherN; i++ {
key := []byte(fmt.Sprintf("foo%d", i))
val := []byte(fmt.Sprintf("bar%d", i))

s.Put(key, val)

w := s.NewWatcher()

// use non-0 to keep watchers in unsynced
w.Watch(key, false, int64(i))
}

s.RangeAllUnsynced()
}

func TestUnsafeAddWatching(t *testing.T) {
s := newWatchableStore(tmpPath)
defer func() {
Expand Down

0 comments on commit 1945eb6

Please sign in to comment.