Skip to content

Commit

Permalink
storage: delete RangeHistory
Browse files Browse the repository at this point in the history
This has been replaced by operations inside `syncWatchings`.
  • Loading branch information
gyuho committed Dec 28, 2015
1 parent 78b0b8a commit ecc3e15
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 322 deletions.
59 changes: 0 additions & 59 deletions storage/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,65 +189,6 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
return n, rev, nil
}

// RangeHistory ranges the history from key to end starting from startRev.
// If `end` is nil, the request only observes the events on key.
// If `end` is not nil, it observes the events on key range [key, range_end).
// Limit limits the number of events returned.
// If startRev <=0, rangeEvents returns events from the beginning of uncompacted history.
//
// If the required start rev is compacted, ErrCompacted will be returned.
// If the required start rev has not happened, ErrFutureRev will be returned.
//
// RangeHistory returns revision bytes slice and key-values that satisfy the requirement (0 <= n <= limit).
// If history in the revision range has not all happened, it returns immeidately
// what is available.
// It also returns nextRev which indicates the start revision used for the following
// RangeEvents call. The nextRev could be smaller than the given endRev if the store
// has not progressed so far or it hits the event limit.
//
// TODO: return byte slices instead of keyValues to avoid meaningless encode and decode.
// This also helps to return raw (key, val) pair directly to make API consistent.
func (s *store) RangeHistory(key, end []byte, limit, startRev int64) (revbs [][]byte, kvs []storagepb.KeyValue, nextRev int64, err error) {
s.mu.Lock()
defer s.mu.Unlock()

if startRev > 0 && startRev <= s.compactMainRev {
return nil, nil, 0, ErrCompacted
}
if startRev > s.currentRev.main {
return nil, nil, 0, ErrFutureRev
}

revs := s.kvindex.RangeSince(key, end, startRev)
if len(revs) == 0 {
return nil, nil, s.currentRev.main + 1, nil
}

tx := s.b.BatchTx()
tx.Lock()
defer tx.Unlock()
// fetch events from the backend using revisions
for _, rev := range revs {
start, end := revBytesRange(rev)

ks, vs := tx.UnsafeRange(keyBucketName, start, end, 0)
if len(vs) != 1 {
log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
}

var kv storagepb.KeyValue
if err := kv.Unmarshal(vs[0]); err != nil {
log.Fatalf("storage: cannot unmarshal event: %v", err)
}
revbs = append(revbs, ks[0])
kvs = append(kvs, kv)
if limit > 0 && len(kvs) >= int(limit) {
return revbs, kvs, rev.main + 1, nil
}
}
return revbs, kvs, s.currentRev.main + 1, nil
}

func (s *store) Compact(rev int64) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down
263 changes: 0 additions & 263 deletions storage/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,73 +263,6 @@ func TestStoreDeleteRange(t *testing.T) {
}
}

func TestStoreRangeHistory(t *testing.T) {
key := newTestKeyBytes(revision{2, 0}, false)
kv := storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 2,
Version: 1,
}
kvb, err := kv.Marshal()
if err != nil {
t.Fatal(err)
}
currev := revision{2, 0}

tests := []struct {
idxr indexRangeEventsResp
r rangeResp
}{
{
indexRangeEventsResp{[]revision{{2, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
{
indexRangeEventsResp{[]revision{{2, 0}, {3, 0}}},
rangeResp{[][]byte{key}, [][]byte{kvb}},
},
}
for i, tt := range tests {
s := newFakeStore()
b := s.b.(*fakeBackend)
fi := s.kvindex.(*fakeIndex)

s.currentRev = currev
fi.indexRangeEventsRespc <- tt.idxr
b.tx.rangeRespc <- tt.r

keys, kvs, _, err := s.RangeHistory([]byte("foo"), []byte("goo"), 1, 1)
if err != nil {
t.Errorf("#%d: err = %v, want nil", i, err)
}
if w := [][]byte{key}; !reflect.DeepEqual(keys, w) {
t.Errorf("#%d: keys = %+v, want %+v", i, keys, w)
}
if w := []storagepb.KeyValue{kv}; !reflect.DeepEqual(kvs, w) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, w)
}

wact := []testutil.Action{
{"rangeEvents", []interface{}{[]byte("foo"), []byte("goo"), int64(1)}},
}
if g := fi.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
}
wstart, wend := revBytesRange(tt.idxr.revs[0])
wact = []testutil.Action{
{"range", []interface{}{keyBucketName, wstart, wend, int64(0)}},
}
if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
}
if s.currentRev != currev {
t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, currev)
}
}
}

func TestStoreCompact(t *testing.T) {
s := newFakeStore()
b := s.b.(*fakeBackend)
Expand Down Expand Up @@ -421,202 +354,6 @@ func TestStoreRestore(t *testing.T) {
}
}

// tests end parameter works well
func TestStoreRangeHistoryEnd(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo1"), []byte("bar1"))
s.Put([]byte("foo2"), []byte("bar2"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, false),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
key, end []byte
wkeys [][]byte
wkvs []storagepb.KeyValue
}{
// get no keys
{
[]byte("doo"), []byte("foo"),
nil, nil,
},
// get no keys when key == end
{
[]byte("foo"), []byte("foo"),
nil, nil,
},
// get no keys when ranging single key
{
[]byte("doo"), nil,
nil, nil,
},
// get all keys
{
[]byte("foo"), []byte("foo3"),
keys, kvs,
},
// get partial keys
{
[]byte("foo"), []byte("foo1"),
keys[:1], kvs[:1],
},
// get single key
{
[]byte("foo"), nil,
keys[:1], kvs[:1],
},
}

for i, tt := range tests {
keys, kvs, rev, err := s.RangeHistory(tt.key, tt.end, 0, 1)
if err != nil {
t.Fatal(err)
}
if rev != 4 {
t.Errorf("#%d: rev = %d, want %d", i, rev, 4)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: actions = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
}
}

func TestStoreRangeHistoryRev(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.DeleteRange([]byte("foo"), nil)
s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("unrelated"), []byte("unrelated"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, true),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo")},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
start int64

wkeys [][]byte
wkvs []storagepb.KeyValue
wnext int64
}{
{0, keys, kvs, 5},
{1, keys, kvs, 5},
{3, keys[2:], kvs[2:], 5},
}

for i, tt := range tests {
keys, kvs, next, err := s.RangeHistory([]byte("foo"), nil, 0, tt.start)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
if next != tt.wnext {
t.Errorf("#%d: next = %d, want %d", i, next, tt.wnext)
}
}
}

func TestStoreRangeHistoryBad(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.Put([]byte("foo"), []byte("bar1"))
s.Put([]byte("foo"), []byte("bar2"))
if err := s.Compact(3); err != nil {
t.Fatalf("compact error (%v)", err)
}

tests := []struct {
rev int64
werr error
}{
{1, ErrCompacted},
{2, ErrCompacted},
{3, ErrCompacted},
{4, ErrFutureRev},
{10, ErrFutureRev},
}
for i, tt := range tests {
_, _, _, err := s.RangeHistory([]byte("foo"), nil, 0, tt.rev)
if err != tt.werr {
t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
}
}
}

func TestStoreRangeHistoryLimit(t *testing.T) {
s := newStore(tmpPath)
defer cleanup(s, tmpPath)

s.Put([]byte("foo"), []byte("bar"))
s.DeleteRange([]byte("foo"), nil)
s.Put([]byte("foo"), []byte("bar"))
keys := [][]byte{
newTestKeyBytes(revision{1, 0}, false),
newTestKeyBytes(revision{2, 0}, true),
newTestKeyBytes(revision{3, 0}, false),
}
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1},
{Key: []byte("foo")},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 3, Version: 1},
}

tests := []struct {
limit int64
wkeys [][]byte
wkvs []storagepb.KeyValue
}{
// no limit
{-1, keys, kvs},
// no limit
{0, keys, kvs},
{1, keys[:1], kvs[:1]},
{2, keys[:2], kvs[:2]},
{3, keys, kvs},
{100, keys, kvs},
}
for i, tt := range tests {
keys, kvs, _, err := s.RangeHistory([]byte("foo"), nil, tt.limit, 1)
if err != nil {
t.Fatalf("#%d: range error (%v)", i, err)
}
if !reflect.DeepEqual(keys, tt.wkeys) {
t.Errorf("#%d: acts = %+v, want %+v", i, keys, tt.wkeys)
}
if !reflect.DeepEqual(kvs, tt.wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, tt.wkvs)
}
}
}

func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
s0 := newStore(tmpPath)
defer os.Remove(tmpPath)
Expand Down

0 comments on commit ecc3e15

Please sign in to comment.