Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
storage: range all unsynced at once
Browse files Browse the repository at this point in the history
This is for etcd-io#3848
to batch RangeHistory for all watchings at once.
  • Loading branch information
gyuho committed Dec 28, 2015
1 parent b79dae2 commit 0ce6de1
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 82 deletions.
112 changes: 84 additions & 28 deletions storage/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package storage
import (
"fmt"
"log"
"math"
"sync"
"time"

Expand Down Expand Up @@ -241,57 +242,112 @@ func (s *watchableStore) syncWatchingsLoop() {
}
}

// syncWatchings syncs the watchings in the unsyncd map.
// syncWatchings periodically syncs unsynced watchings. A watching becomes
// unsynced if its chan buffer is full and it is behind the current revision.
func (s *watchableStore) syncWatchings() {
if len(s.unsynced) == 0 {
return
}

// in order to find key-value pairs from unsynced watchings, we need to
// find min/maximum revision index, and these revisions can be used to
// query the backend store of key-value pairs
minRev, maxRev := int64(math.MaxInt64), int64(math.MinInt64)

// get the main revision of the last compaction.
// if unsynced watching's channel is not full and its
// watching revision is beyond current revision, move
// it to synced and watch its future events.
_, curRev, _ := s.store.Range(nil, nil, 0, 0)

keyToUnsynced := make(map[string]map[*watching]struct{})

for w := range s.unsynced {
var end []byte
if w.prefix {
end = make([]byte, len(w.key))
copy(end, w.key)
end[len(w.key)-1]++
}
limit := cap(w.ch) - len(w.ch)
// the channel is full, try it in the next round
if limit == 0 {
// channel is full, so try at next round
continue
}
revbs, kvs, nextRev, err := s.store.RangeHistory(w.key, end, int64(limit), w.cur)
if err != nil {
// TODO: send error event to watching

k := string(w.key)

if w.cur > curRev {
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
continue
}

// push events to the channel
for i, kv := range kvs {
var evt storagepb.Event_EventType
switch {
case isTombstone(revbs[i]):
evt = storagepb.DELETE
default:
evt = storagepb.PUT
}
if minRev >= w.cur {
minRev = w.cur
}
if maxRev <= w.cur {
maxRev = w.cur
}

if _, ok := keyToUnsynced[k]; !ok {
keyToUnsynced[k] = make(map[*watching]struct{})
}
keyToUnsynced[k][w] = struct{}{}
}

// to prevent returning nothing
if minRev == maxRev {
maxRev = maxRev + curRev
}

min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: minRev}, min)
revToBytes(revision{main: maxRev, sub: maxRev}, max)

s.store.mu.Lock()
defer s.store.mu.Unlock()

// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
// values are actual key-value pairs in backend.
tx := s.store.b.BatchTx()
tx.Lock()
ks, vs := tx.UnsafeRange(keyBucketName, min, max, 0)
tx.Unlock()

for i, v := range vs {
var kv storagepb.KeyValue
if err := kv.Unmarshal(v); err != nil {
log.Panicf("storage: cannot unmarshal event: %v", err)
}

k := string(kv.Key)
wm := make(map[*watching]struct{})
if v, ok := keyToUnsynced[k]; !ok {
continue // synced
} else {
wm = v
}

var evt storagepb.Event_EventType
switch {
case isTombstone(ks[i]):
evt = storagepb.DELETE
default:
evt = storagepb.PUT
}

for w := range wm {
w.ch <- storagepb.Event{
Type: evt,
Kv: &kv,
WatchID: w.id,
}
pendingEventsGauge.Inc()
}
// switch to tracking future events if needed
if nextRev > curRev {
k := string(w.key)
if err := unsafeAddWatching(&s.synced, k, w); err != nil {
log.Panicf("error unsafeAddWatching (%v) for key %s", err, k)
}
delete(s.unsynced, w)
continue

pendingEventsGauge.Inc()
}
// put it back to try it in the next round
w.cur = nextRev
}

slowWatchingGauge.Set(float64(len(s.unsynced)))
}

Expand Down
82 changes: 42 additions & 40 deletions storage/watchable_store_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,43 @@ import (
"testing"
)

// Benchmarks on cancel function performance for synced watchers.
func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
// arbitrary number of watchers
watcherN := 1000000

s := newWatchableStore(tmpPath)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

// Put a key so that we can spawn watchers on that key
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)

w := s.NewWatcher()

cancels := make([]CancelFunc, watcherN)
for i := 0; i < watcherN; i++ {
// 0 for startRev to keep watchers in synced
_, cancel := w.Watch(testKey, true, 0)
cancels[i] = cancel
}

// randomly cancel watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherN)

b.ResetTimer()
b.ReportAllocs()

for _, idx := range ix {
cancels[idx]()
}
}

// Benchmarks on cancel function performance for unsynced watchers
// in a WatchableStore. It creates k*N watchers to populate unsynced
// with a reasonably large number of watchers. And measures the time it
Expand All @@ -30,7 +67,8 @@ import (
func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
const k int = 2
benchSampleSize := b.N
watcherSize := k * benchSampleSize
watcherN := k * benchSampleSize

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore periodically calls syncWatchersLoop
// method to sync watchers in unsynced map. We want to keep watchers
Expand Down Expand Up @@ -62,16 +100,16 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {

w := s.NewWatcher()

cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
cancels := make([]CancelFunc, watcherN)
for i := 0; i < watcherN; i++ {
// non-0 value to keep watchers in unsynced
_, cancel := w.Watch(testKey, true, 1)
cancels[i] = cancel
}

// random-cancel N watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherSize)
ix := rand.Perm(watcherN)

b.ResetTimer()
b.ReportAllocs()
Expand All @@ -81,39 +119,3 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) {
cancels[idx]()
}
}

func BenchmarkWatchableStoreSyncedCancel(b *testing.B) {
s := newWatchableStore(tmpPath)
defer func() {
s.store.Close()
os.Remove(tmpPath)
}()

// Put a key so that we can spawn watchers on that key
testKey := []byte("foo")
testValue := []byte("bar")
s.Put(testKey, testValue)

w := s.NewWatcher()

// put 1 million watchers on the same key
const watcherSize = 1000000

cancels := make([]CancelFunc, watcherSize)
for i := 0; i < watcherSize; i++ {
// 0 for startRev to keep watchers in synced
_, cancel := w.Watch(testKey, true, 0)
cancels[i] = cancel
}

// randomly cancel watchers to make it not biased towards
// data structures with an order, such as slice.
ix := rand.Perm(watcherSize)

b.ResetTimer()
b.ReportAllocs()

for _, idx := range ix {
cancels[idx]()
}
}
26 changes: 12 additions & 14 deletions storage/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func TestNewWatcherCancel(t *testing.T) {

// TestCancelUnsynced tests if running CancelFunc removes watchings from unsynced.
func TestCancelUnsynced(t *testing.T) {
// arbitrary number for watchers
watcherN := 100

// manually create watchableStore instead of newWatchableStore
// because newWatchableStore automatically calls syncWatchers
// method to sync watchers in unsynced map. We want to keep watchers
Expand Down Expand Up @@ -89,9 +92,6 @@ func TestCancelUnsynced(t *testing.T) {

w := s.NewWatcher()

// arbitrary number for watcher
watcherN := 100

// create watcherN of CancelFunc of
// synced and unsynced
cancels := make([]CancelFunc, watcherN)
Expand Down Expand Up @@ -119,6 +119,9 @@ func TestCancelUnsynced(t *testing.T) {
// events to channel of unsynced watchings and moves these
// watchings to synced.
func TestSyncWatchings(t *testing.T) {
// arbitrary number for watchers
watcherN := 150

s := &watchableStore{
store: newStore(tmpPath),
unsynced: make(map[*watching]struct{}),
Expand All @@ -136,9 +139,6 @@ func TestSyncWatchings(t *testing.T) {

w := s.NewWatcher()

// arbitrary number for watcher
watcherN := 100

for i := 0; i < watcherN; i++ {
// use 1 to keep watchers in unsynced
w.Watch(testKey, true, 1)
Expand All @@ -149,12 +149,12 @@ func TestSyncWatchings(t *testing.T) {
// synced should be empty
// because we manually populate unsynced only
if len(s.synced[string(testKey)]) != 0 {
t.Fatalf("synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)]))
t.Fatalf("[BEFORE syncWatchings] synced[string(testKey)] size = %d, want 0", len(s.synced[string(testKey)]))
}
// unsynced should not be empty
// because we manually populated unsynced only
if len(s.unsynced) == 0 {
t.Errorf("unsynced size = %d, want %d", len(s.unsynced), watcherN)
t.Errorf("[BEFORE syncWatchings] unsynced size = %d, want %d", len(s.unsynced), watcherN)
}

// this should move all unsynced watchings
Expand All @@ -167,14 +167,12 @@ func TestSyncWatchings(t *testing.T) {
// because syncWatchings populates synced
// in this test case
if len(s.synced[string(testKey)]) == 0 {
t.Errorf("synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
t.Errorf("[AFTER syncWatching] synced[string(testKey)] size = 0, want %d", len(s.synced[string(testKey)]))
}
// unsynced should be empty
// because syncWatchings is expected to move
// all watchings from unsynced to synced
// in this test case
// unsynced should be empty because syncWatchings should have moved
// all watchings from unsynced to synced in this test case
if len(s.unsynced) != 0 {
t.Errorf("unsynced size = %d, want 0", len(s.unsynced))
t.Errorf("[AFTER syncWatchings] unsynced size = %d, want 0", len(s.unsynced))
}

// All of the watchings actually share one channel
Expand Down

0 comments on commit 0ce6de1

Please sign in to comment.