From c4ef0fc3a91852f7e26975d688b6a9d8efc92886 Mon Sep 17 00:00:00 2001 From: Sami Hiltunen Date: Wed, 6 Sep 2017 10:32:19 +0200 Subject: [PATCH] view iterator * Adds iterators for Views. --- mock/storage.go | 15 ++++++++++-- storage/iterator.go | 8 ++++++ storage/iterator_test.go | 6 +++-- storage/memory.go | 20 +++++++++------ storage/multi_iterator.go | 41 +++++++++++++++++++++++++++++++ storage/multi_iterator_test.go | 45 ++++++++++++++++++++++++++++++++++ storage/null.go | 15 +++++++++--- storage/storage.go | 18 +++++++++++--- storage/storage_test.go | 9 ++++--- view.go | 21 +++++++++++++++- 10 files changed, 177 insertions(+), 21 deletions(-) create mode 100644 storage/multi_iterator.go create mode 100644 storage/multi_iterator_test.go diff --git a/mock/storage.go b/mock/storage.go index ff949923..48a36a46 100644 --- a/mock/storage.go +++ b/mock/storage.go @@ -82,10 +82,11 @@ func (_mr *_MockStorageRecorder) Has(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "Has", arg0) } -func (_m *MockStorage) Iterator() storage.Iterator { +func (_m *MockStorage) Iterator() (storage.Iterator, error) { ret := _m.ctrl.Call(_m, "Iterator") ret0, _ := ret[0].(storage.Iterator) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } func (_mr *_MockStorageRecorder) Iterator() *gomock.Call { @@ -112,6 +113,16 @@ func (_mr *_MockStorageRecorder) Open() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "Open") } +func (_m *MockStorage) Recovered() bool { + ret := _m.ctrl.Call(_m, "Recovered") + ret0, _ := ret[0].(bool) + return ret0 +} + +func (_mr *_MockStorageRecorder) Recovered() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "Recovered") +} + func (_m *MockStorage) Set(_param0 string, _param1 interface{}) error { ret := _m.ctrl.Call(_m, "Set", _param0, _param1) ret0, _ := ret[0].(error) diff --git a/storage/iterator.go b/storage/iterator.go index c049af3d..2c4e55b2 100644 --- a/storage/iterator.go +++ b/storage/iterator.go @@ -3,6 +3,7 @@ package storage import ( "fmt" + "github.com/syndtr/goleveldb/leveldb" ldbiter "github.com/syndtr/goleveldb/leveldb/iterator" ) @@ -11,8 +12,10 @@ import ( type iterator struct { iter ldbiter.Iterator codec Codec + snap *leveldb.Snapshot } +// Next advances the iterator to the next key. func (i *iterator) Next() bool { next := i.iter.Next() if string(i.iter.Key()) == offsetKey { @@ -22,10 +25,12 @@ func (i *iterator) Next() bool { return next } +// Key returns the current key. func (i *iterator) Key() []byte { return i.iter.Key() } +// Value returns the current value decoded by the codec of the storage. func (i *iterator) Value() (interface{}, error) { data := i.iter.Value() if data == nil { @@ -40,6 +45,9 @@ func (i *iterator) Value() (interface{}, error) { return val, nil } +// Releases releases the iterator and the associated snapshot. The iterator is +// not usable anymore after calling Release. func (i *iterator) Release() { i.iter.Release() + i.snap.Release() } diff --git a/storage/iterator_test.go b/storage/iterator_test.go index 584cc919..d5d17efe 100644 --- a/storage/iterator_test.go +++ b/storage/iterator_test.go @@ -33,7 +33,8 @@ func TestIterator(t *testing.T) { ensure.Nil(t, st.SetOffset(777)) - iter := st.Iterator() + iter, err := st.Iterator() + ensure.Nil(t, err) defer iter.Release() count := 0 @@ -79,7 +80,8 @@ func TestIterator_DecodingError(t *testing.T) { ensure.Nil(t, st.Set("key-1", "val-1")) - iter := st.Iterator() + iter, err := st.Iterator() + ensure.Nil(t, err) defer iter.Release() count := 0 diff --git a/storage/memory.go b/storage/memory.go index 928a1fa6..68383d86 100644 --- a/storage/memory.go +++ b/storage/memory.go @@ -41,26 +41,28 @@ func (i *memiter) Release() { i.current = len(i.keys) } -func (m *memory) Iterator() Iterator { +func (m *memory) Iterator() (Iterator, error) { keys := make([]string, 0, len(m.storage)) for k := range m.storage { keys = append(keys, k) } - return &memiter{-1, keys, m.storage} + return &memiter{-1, keys, m.storage}, nil } type memory struct { - storage map[string]interface{} - offset *int64 - c Codec + storage map[string]interface{} + offset *int64 + c Codec + recovered bool } // NewMemory returns a new in-memory storage. func NewMemory(c Codec) Storage { return &memory{ - storage: make(map[string]interface{}), - c: c, + storage: make(map[string]interface{}), + c: c, + recovered: false, } } @@ -100,6 +102,10 @@ func (m *memory) MarkRecovered() error { return nil } +func (m *memory) Recovered() bool { + return m.recovered +} + func (m *memory) SetOffset(offset int64) error { m.offset = &offset return nil diff --git a/storage/multi_iterator.go b/storage/multi_iterator.go new file mode 100644 index 00000000..988bfb10 --- /dev/null +++ b/storage/multi_iterator.go @@ -0,0 +1,41 @@ +package storage + +type multiIterator struct { + current int + iters []Iterator +} + +// NewMultiIterator returns an iterator that iterates over the given iterators. +func NewMultiIterator(iters []Iterator) Iterator { + if len(iters) == 0 { + return &NullIter{} + } + + return &multiIterator{current: 0, iters: iters} +} + +func (m *multiIterator) Next() bool { + next := m.iters[m.current].Next() + if !next && len(m.iters)-1 > m.current { + m.current++ + next = m.iters[m.current].Next() + } + + return next +} + +func (m *multiIterator) Key() []byte { + return m.iters[m.current].Key() +} + +func (m *multiIterator) Value() (interface{}, error) { + return m.iters[m.current].Value() +} + +func (m *multiIterator) Release() { + for i := range m.iters { + m.iters[i].Release() + } + m.current = 0 + m.iters = []Iterator{&NullIter{}} +} diff --git a/storage/multi_iterator_test.go b/storage/multi_iterator_test.go new file mode 100644 index 00000000..5fe11b96 --- /dev/null +++ b/storage/multi_iterator_test.go @@ -0,0 +1,45 @@ +package storage + +import ( + "fmt" + "testing" + + "github.com/facebookgo/ensure" + "github.com/lovoo/goka/codec" +) + +func TestMultiIterator(t *testing.T) { + numStorages := 3 + numValues := 3 + + storages := make([]Storage, numStorages) + expected := map[string]string{} + + for i := 0; i < numStorages; i++ { + storages[i] = NewMemory(&codec.String{}) + for j := 0; j < numValues; j++ { + key := fmt.Sprintf("storage-%d", i) + val := fmt.Sprintf("value-%d", j) + expected[key] = val + storages[i].Set(key, val) + } + } + + iters := make([]Iterator, len(storages)) + for i := range storages { + iter, err := storages[i].Iterator() + ensure.Nil(t, err) + iters[i] = iter + } + + iter := NewMultiIterator(iters) + count := 0 + for iter.Next() { + val, err := iter.Value() + ensure.Nil(t, err) + ensure.DeepEqual(t, expected[string(iter.Key())], val.(string)) + count++ + } + + ensure.DeepEqual(t, count, len(expected)) +} diff --git a/storage/null.go b/storage/null.go index c4e54ad1..7413a926 100644 --- a/storage/null.go +++ b/storage/null.go @@ -2,7 +2,9 @@ package storage // Null storage discards everything that it is given. This can be useful for // debugging. -type Null struct{} +type Null struct { + recovered bool +} // NewNull returns a new Null storage. func NewNull() Storage { @@ -14,6 +16,11 @@ func (n *Null) MarkRecovered() error { return nil } +// Recovered returns whether the storage has recovered. +func (n *Null) Recovered() bool { + return n.recovered +} + // Has returns false as in key not found. func (n *Null) Has(key string) (bool, error) { return false, nil @@ -50,8 +57,8 @@ func (n *Null) SetOffset(val int64) error { } // Iterator returns an Iterator that is immediately exhausted. -func (n *Null) Iterator() Iterator { - return new(NullIter) +func (n *Null) Iterator() (Iterator, error) { + return new(NullIter), nil } // Open does nothing and doesn't error. @@ -67,6 +74,7 @@ func (n *Null) Close() error { // Sync does nothing and doesn't error func (n *Null) Sync() {} +// NullIter is an iterator which is immediately exhausted. type NullIter struct{} // Next returns always false. @@ -84,4 +92,5 @@ func (ni *NullIter) Value() (interface{}, error) { return nil, nil } +// Release does nothing. func (ni *NullIter) Release() {} diff --git a/storage/storage.go b/storage/storage.go index e2949369..3526e8d8 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -39,8 +39,9 @@ type Storage interface { Delete(string) error SetOffset(value int64) error GetOffset(defValue int64) (int64, error) - Iterator() Iterator + Iterator() (Iterator, error) MarkRecovered() error + Recovered() bool Open() error Close() error Sync() @@ -84,11 +85,18 @@ func New(db *leveldb.DB, c Codec) (Storage, error) { }, nil } -func (s *storage) Iterator() Iterator { +// Iterator returns an iterator that traverses over a snapshot of the storage. +func (s *storage) Iterator() (Iterator, error) { + snap, err := s.db.GetSnapshot() + if err != nil { + return nil, err + } + return &iterator{ iter: s.store.NewIterator(nil, nil), codec: s.codec, - } + snap: snap, + }, nil } func (s *storage) Has(key string) (bool, error) { @@ -176,6 +184,10 @@ func (s *storage) MarkRecovered() error { return s.tx.Commit() } +func (s *storage) Recovered() bool { + return s.store == s.db +} + func (s *storage) Open() error { return nil } diff --git a/storage/storage_test.go b/storage/storage_test.go index 14434226..c404814d 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -50,11 +50,13 @@ func TestMemIter(t *testing.T) { } // released iterator should be immediately exhausted - iter := storage.Iterator() + iter, err := storage.Iterator() + ensure.Nil(t, err) iter.Release() ensure.False(t, iter.Next(), "released iterator had a next") - iter = storage.Iterator() + iter, err = storage.Iterator() + ensure.Nil(t, err) for iter.Next() { raw, err := iter.Value() ensure.Nil(t, err) @@ -155,7 +157,8 @@ func TestSetGet(t *testing.T) { // test iteration ensure.Nil(t, storage.SetEncoded("encoded", []byte("encoded-value"))) ensure.Nil(t, storage.Set("decoded", "decoded-value")) - iter := storage.Iterator() + iter, err := storage.Iterator() + ensure.Nil(t, err) defer iter.Release() messages := map[string]interface{}{} for iter.Next() { diff --git a/view.go b/view.go index 42c0650f..b173c573 100644 --- a/view.go +++ b/view.go @@ -31,7 +31,6 @@ type View struct { // NewView creates a new View object from a group. func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error) { - options = append( // default options comes first []ViewOption{ @@ -247,6 +246,26 @@ func (v *View) Has(key string) (bool, error) { return s.Has(key) } +// Iterator returns an iterator that iterates over the state of the View. +func (v *View) Iterator() (storage.Iterator, error) { + iters := make([]storage.Iterator, 0, len(v.partitions)) + for i := range v.partitions { + iter, err := v.partitions[i].st.Iterator() + if err != nil { + // release already opened iterators + for i := range iters { + iters[i].Release() + } + + return nil, fmt.Errorf("error opening partition iterator: %v", err) + } + + iters = append(iters, iter) + } + + return storage.NewMultiIterator(iters), nil +} + func (v *View) run() { defer close(v.done) v.opts.log.Printf("View: started")