Skip to content

Commit

Permalink
storage: range all unsynced at once
Browse files Browse the repository at this point in the history
This is for #3848
to batch RangeHistory for all watchings at once.
  • Loading branch information
gyuho committed Dec 24, 2015
1 parent b79dae2 commit 41d8b62
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 83 deletions.
138 changes: 108 additions & 30 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -174,6 +175,7 @@ func (s *watchableStore) NewWatcher() Watcher {
}

func (s *watchableStore) watch(key []byte, prefix bool, startRev, id int64, ch chan<- storagepb.Event) (*watching, CancelFunc) {

s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -241,57 +243,133 @@ func (s *watchableStore) syncWatchingsLoop() {
}
}

// syncWatchings syncs the watchings in the unsyncd map.
func (s *watchableStore) syncWatchings() {
// rangeUnsynced ranges on all unsynced watchings and returns all
// revision bytes, key-value pairs and next revision.
func (s *watchableStore) rangeUnsynced() ([][]byte, []storagepb.KeyValue, int64, map[string]map[*watching]struct{}, error) {
// get minimum and maximum revision to get all key-value pairs in unsynced
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)
keyToWatchings := make(map[string]map[*watching]struct{})
_, curRev, _ := s.store.Range(nil, nil, 0, 0)
for w := range s.unsynced {
var end []byte
if w.prefix {
end = make([]byte, len(w.key))
copy(end, w.key)
end[len(w.key)-1]++
}
limit := cap(w.ch) - len(w.ch)
// the channel is full, try it in the next round
if limit == 0 {
if w.cur > 0 && w.cur <= s.store.compactMainRev {
// TODO: handle this ErrCompacted error
log.Printf("storage: %v", ErrCompacted)
delete(s.unsynced, w)
continue
}
revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
if err != nil {
// TODO: send error event to watching
if w.cur > s.store.currentRev.main {
// TODO: handle this ErrFutureRev error
log.Printf("storage: %v", ErrFutureRev)
delete(s.unsynced, w)
continue
}

// push events to the channel
for i, kv := range kvs {
var evt storagepb.Event_EventType
switch {
case isTombstone(revbs[i]):
evt = storagepb.DELETE
default:
evt = storagepb.PUT
k := string(w.key)

if w.cur > curRev {
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
continue
}

if minRev >= w.cur {
minRev = w.cur
}
if maxRev <= w.cur {
maxRev = w.cur
}

if v, ok := keyToWatchings[k]; !ok {
keyToWatchings[k] = make(map[*watching]struct{})
keyToWatchings[k][w] = struct{}{}
} else {
v[w] = struct{}{}
}
}

// to prevent returning nothing
if minRev == maxRev {
maxRev++
}

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, min)
revToBytes(revision{main: maxRev, sub: maxRev}, max)

s.store.mu.Lock()
defer s.store.mu.Unlock()

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
tx := s.store.b.BatchTx()
tx.Lock()
ks, vs := tx.UnsafeRange(keyBucketName, min, max, 0)
tx.Unlock()

revbs := [][]byte{}
kvs := []storagepb.KeyValue{}
for i, v := range vs {
var kv storagepb.KeyValue
if err := kv.Unmarshal(v); err != nil {
return nil, nil, 0, nil, fmt.Errorf("storage: cannot unmarshal event: %v", err)
}
if _, ok := keyToWatchings[string(kv.Key)]; !ok {
continue
}
revbs = append(revbs, ks[i])
kvs = append(kvs, kv)
}

return revbs, kvs, s.store.currentRev.main + 1, keyToWatchings, nil
}

// syncWatchings syncs the watchings in the unsyncd map.
func (s *watchableStore) syncWatchings() {

if len(s.unsynced) == 0 {
return
}

revbs, kvs, _, keyToWatchings, err := s.rangeUnsynced()
if err != nil {
log.Panicf("error s.rangeUnsynced: %v", err)
}

// push events to the channel
for i, kv := range kvs {
var evt storagepb.Event_EventType
switch {
case isTombstone(revbs[i]):
evt = storagepb.DELETE
default:
evt = storagepb.PUT
}

k := string(kv.Key)

wm := make(map[*watching]struct{})
if v, ok := keyToWatchings[k]; ok {
wm = v
} else {
continue // in synced
}

for w := range wm {
w.ch <- storagepb.Event{
Type: evt,
Kv: &kv,
WatchID: w.id,
}
pendingEventsGauge.Inc()
}
// switch to tracking future events if needed
if nextRev > curRev {
k := string(w.key)
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
continue

pendingEventsGauge.Inc()
}
// put it back to try it in the next round
w.cur = nextRev
}

slowWatchingGauge.Set(float64(len(s.unsynced)))
}

Expand Down
82 changes: 42 additions & 40 deletions storage/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,43 @@ import (
"testing"
)

// Benchmarks on cancel function performance for synced watchers.
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
// arbitrary number of watchers
watcherN := 1000000

s := newWatchableStore(tmpPath)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

// Put a key so that we can spawn watchers on that key
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)

w := s.NewWatcher()

cancels := make([]CancelFunc, watcherN)
for i := 0; i < watcherN; i++ {
// 0 for startRev to keep watchers in synced
_, cancel := w.Watch(testKey, true, 0)
cancels[i] = cancel
}

// randomly cancel watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherN)

b.ResetTimer()
b.ReportAllocs()

for _, idx := range ix {
cancels[idx]()
}
}

// Benchmarks on cancel function performance for unsynced watchers
// in a WatchableStore. It creates k*N watchers to populate unsynced
// with a reasonably large number of watchers. And measures the time it
Expand All @@ -30,7 +67,8 @@ import (
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
const k int = 2
benchSampleSize := b.N
watcherSize := k * benchSampleSize
watcherN := k * benchSampleSize

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore periodically calls syncWatchersLoop
// method to sync watchers in unsynced map. We want to keep watchers
Expand Down Expand Up @@ -62,16 +100,16 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

w := s.NewWatcher()

cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
cancels := make([]CancelFunc, watcherN)
for i := 0; i < watcherN; i++ {
// non-0 value to keep watchers in unsynced
_, cancel := w.Watch(testKey, true, 1)
cancels[i] = cancel
}

// random-cancel N watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherSize)
ix := rand.Perm(watcherN)

b.ResetTimer()
b.ReportAllocs()
Expand All @@ -81,39 +119,3 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
cancels[idx]()
}
}

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
s := newWatchableStore(tmpPath)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

// Put a key so that we can spawn watchers on that key
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)

w := s.NewWatcher()

// put 1 million watchers on the same key
const watcherSize = 1000000

cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
// 0 for startRev to keep watchers in synced
_, cancel := w.Watch(testKey, true, 0)
cancels[i] = cancel
}

// randomly cancel watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherSize)

b.ResetTimer()
b.ReportAllocs()

for _, idx := range ix {
cancels[idx]()
}
}
Loading

0 comments on commit 41d8b62

Please sign in to comment.