Skip to content

Commit

Permalink
Merge pull request #33 from lovoo/view-iterator
Browse files Browse the repository at this point in the history
view iterator
  • Loading branch information
SamiHiltunen authored Sep 8, 2017
2 parents 6849029 + c4ef0fc commit 54bca80
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 21 deletions.
15 changes: 13 additions & 2 deletions mock/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func (_mr *_MockStorageRecorder) Has(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Has", arg0)
}

func (_m *MockStorage) Iterator() storage.Iterator {
func (_m *MockStorage) Iterator() (storage.Iterator, error) {
ret := _m.ctrl.Call(_m, "Iterator")
ret0, _ := ret[0].(storage.Iterator)
return ret0
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockStorageRecorder) Iterator() *gomock.Call {
Expand All @@ -112,6 +113,16 @@ func (_mr *_MockStorageRecorder) Open() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Open")
}

func (_m *MockStorage) Recovered() bool {
ret := _m.ctrl.Call(_m, "Recovered")
ret0, _ := ret[0].(bool)
return ret0
}

func (_mr *_MockStorageRecorder) Recovered() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Recovered")
}

func (_m *MockStorage) Set(_param0 string, _param1 interface{}) error {
ret := _m.ctrl.Call(_m, "Set", _param0, _param1)
ret0, _ := ret[0].(error)
Expand Down
8 changes: 8 additions & 0 deletions storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"fmt"

"github.com/syndtr/goleveldb/leveldb"
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
)

Expand All @@ -11,8 +12,10 @@ import (
type iterator struct {
iter ldbiter.Iterator
codec Codec
snap *leveldb.Snapshot
}

// Next advances the iterator to the next key.
func (i *iterator) Next() bool {
next := i.iter.Next()
if string(i.iter.Key()) == offsetKey {
Expand All @@ -22,10 +25,12 @@ func (i *iterator) Next() bool {
return next
}

// Key returns the current key.
func (i *iterator) Key() []byte {
return i.iter.Key()
}

// Value returns the current value decoded by the codec of the storage.
func (i *iterator) Value() (interface{}, error) {
data := i.iter.Value()
if data == nil {
Expand All @@ -40,6 +45,9 @@ func (i *iterator) Value() (interface{}, error) {
return val, nil
}

// Releases releases the iterator and the associated snapshot. The iterator is
// not usable anymore after calling Release.
func (i *iterator) Release() {
i.iter.Release()
i.snap.Release()
}
6 changes: 4 additions & 2 deletions storage/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestIterator(t *testing.T) {

ensure.Nil(t, st.SetOffset(777))

iter := st.Iterator()
iter, err := st.Iterator()
ensure.Nil(t, err)
defer iter.Release()
count := 0

Expand Down Expand Up @@ -79,7 +80,8 @@ func TestIterator_DecodingError(t *testing.T) {

ensure.Nil(t, st.Set("key-1", "val-1"))

iter := st.Iterator()
iter, err := st.Iterator()
ensure.Nil(t, err)
defer iter.Release()

count := 0
Expand Down
20 changes: 13 additions & 7 deletions storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,28 @@ func (i *memiter) Release() {
i.current = len(i.keys)
}

func (m *memory) Iterator() Iterator {
func (m *memory) Iterator() (Iterator, error) {
keys := make([]string, 0, len(m.storage))
for k := range m.storage {
keys = append(keys, k)
}

return &memiter{-1, keys, m.storage}
return &memiter{-1, keys, m.storage}, nil
}

type memory struct {
storage map[string]interface{}
offset *int64
c Codec
storage map[string]interface{}
offset *int64
c Codec
recovered bool
}

// NewMemory returns a new in-memory storage.
func NewMemory(c Codec) Storage {
return &memory{
storage: make(map[string]interface{}),
c: c,
storage: make(map[string]interface{}),
c: c,
recovered: false,
}
}

Expand Down Expand Up @@ -100,6 +102,10 @@ func (m *memory) MarkRecovered() error {
return nil
}

func (m *memory) Recovered() bool {
return m.recovered
}

func (m *memory) SetOffset(offset int64) error {
m.offset = &offset
return nil
Expand Down
41 changes: 41 additions & 0 deletions storage/multi_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package storage

type multiIterator struct {
current int
iters []Iterator
}

// NewMultiIterator returns an iterator that iterates over the given iterators.
func NewMultiIterator(iters []Iterator) Iterator {
if len(iters) == 0 {
return &NullIter{}
}

return &multiIterator{current: 0, iters: iters}
}

func (m *multiIterator) Next() bool {
next := m.iters[m.current].Next()
if !next && len(m.iters)-1 > m.current {
m.current++
next = m.iters[m.current].Next()
}

return next
}

func (m *multiIterator) Key() []byte {
return m.iters[m.current].Key()
}

func (m *multiIterator) Value() (interface{}, error) {
return m.iters[m.current].Value()
}

func (m *multiIterator) Release() {
for i := range m.iters {
m.iters[i].Release()
}
m.current = 0
m.iters = []Iterator{&NullIter{}}
}
45 changes: 45 additions & 0 deletions storage/multi_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package storage

import (
"fmt"
"testing"

"github.com/facebookgo/ensure"
"github.com/lovoo/goka/codec"
)

func TestMultiIterator(t *testing.T) {
numStorages := 3
numValues := 3

storages := make([]Storage, numStorages)
expected := map[string]string{}

for i := 0; i < numStorages; i++ {
storages[i] = NewMemory(&codec.String{})
for j := 0; j < numValues; j++ {
key := fmt.Sprintf("storage-%d", i)
val := fmt.Sprintf("value-%d", j)
expected[key] = val
storages[i].Set(key, val)
}
}

iters := make([]Iterator, len(storages))
for i := range storages {
iter, err := storages[i].Iterator()
ensure.Nil(t, err)
iters[i] = iter
}

iter := NewMultiIterator(iters)
count := 0
for iter.Next() {
val, err := iter.Value()
ensure.Nil(t, err)
ensure.DeepEqual(t, expected[string(iter.Key())], val.(string))
count++
}

ensure.DeepEqual(t, count, len(expected))
}
15 changes: 12 additions & 3 deletions storage/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package storage

// Null storage discards everything that it is given. This can be useful for
// debugging.
type Null struct{}
type Null struct {
recovered bool
}

// NewNull returns a new Null storage.
func NewNull() Storage {
Expand All @@ -14,6 +16,11 @@ func (n *Null) MarkRecovered() error {
return nil
}

// Recovered returns whether the storage has recovered.
func (n *Null) Recovered() bool {
return n.recovered
}

// Has returns false as in key not found.
func (n *Null) Has(key string) (bool, error) {
return false, nil
Expand Down Expand Up @@ -50,8 +57,8 @@ func (n *Null) SetOffset(val int64) error {
}

// Iterator returns an Iterator that is immediately exhausted.
func (n *Null) Iterator() Iterator {
return new(NullIter)
func (n *Null) Iterator() (Iterator, error) {
return new(NullIter), nil
}

// Open does nothing and doesn't error.
Expand All @@ -67,6 +74,7 @@ func (n *Null) Close() error {
// Sync does nothing and doesn't error
func (n *Null) Sync() {}

// NullIter is an iterator which is immediately exhausted.
type NullIter struct{}

// Next returns always false.
Expand All @@ -84,4 +92,5 @@ func (ni *NullIter) Value() (interface{}, error) {
return nil, nil
}

// Release does nothing.
func (ni *NullIter) Release() {}
18 changes: 15 additions & 3 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type Storage interface {
Delete(string) error
SetOffset(value int64) error
GetOffset(defValue int64) (int64, error)
Iterator() Iterator
Iterator() (Iterator, error)
MarkRecovered() error
Recovered() bool
Open() error
Close() error
Sync()
Expand Down Expand Up @@ -84,11 +85,18 @@ func New(db *leveldb.DB, c Codec) (Storage, error) {
}, nil
}

func (s *storage) Iterator() Iterator {
// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) Iterator() (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

return &iterator{
iter: s.store.NewIterator(nil, nil),
codec: s.codec,
}
snap: snap,
}, nil
}

func (s *storage) Has(key string) (bool, error) {
Expand Down Expand Up @@ -176,6 +184,10 @@ func (s *storage) MarkRecovered() error {
return s.tx.Commit()
}

func (s *storage) Recovered() bool {
return s.store == s.db
}

func (s *storage) Open() error {
return nil
}
Expand Down
9 changes: 6 additions & 3 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ func TestMemIter(t *testing.T) {
}

// released iterator should be immediately exhausted
iter := storage.Iterator()
iter, err := storage.Iterator()
ensure.Nil(t, err)
iter.Release()
ensure.False(t, iter.Next(), "released iterator had a next")

iter = storage.Iterator()
iter, err = storage.Iterator()
ensure.Nil(t, err)
for iter.Next() {
raw, err := iter.Value()
ensure.Nil(t, err)
Expand Down Expand Up @@ -155,7 +157,8 @@ func TestSetGet(t *testing.T) {
// test iteration
ensure.Nil(t, storage.SetEncoded("encoded", []byte("encoded-value")))
ensure.Nil(t, storage.Set("decoded", "decoded-value"))
iter := storage.Iterator()
iter, err := storage.Iterator()
ensure.Nil(t, err)
defer iter.Release()
messages := map[string]interface{}{}
for iter.Next() {
Expand Down
21 changes: 20 additions & 1 deletion view.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ type View struct {

// NewView creates a new View object from a group.
func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error) {

options = append(
// default options comes first
[]ViewOption{
Expand Down Expand Up @@ -247,6 +246,26 @@ func (v *View) Has(key string) (bool, error) {
return s.Has(key)
}

// Iterator returns an iterator that iterates over the state of the View.
func (v *View) Iterator() (storage.Iterator, error) {
iters := make([]storage.Iterator, 0, len(v.partitions))
for i := range v.partitions {
iter, err := v.partitions[i].st.Iterator()
if err != nil {
// release already opened iterators
for i := range iters {
iters[i].Release()
}

return nil, fmt.Errorf("error opening partition iterator: %v", err)
}

iters = append(iters, iter)
}

return storage.NewMultiIterator(iters), nil
}

func (v *View) run() {
defer close(v.done)
v.opts.log.Printf("View: started")
Expand Down

0 comments on commit 54bca80

Please sign in to comment.