Skip to content

Commit

Permalink
Merge pull request #105 from lovoo/feature/restartable-views
Browse files Browse the repository at this point in the history
add restartable view support
  • Loading branch information
db7 authored Mar 14, 2018
2 parents 369663c + ecdeedf commit e99e632
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 84 deletions.
14 changes: 14 additions & 0 deletions once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package goka

import "sync"

type once struct {
once sync.Once
err error
}

// Do runs only once and always return the same error.
func (o *once) Do(f func() error) error {
o.once.Do(func() { o.err = f() })
return o.err
}
19 changes: 19 additions & 0 deletions once_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package goka

import (
"errors"
"testing"

"github.com/facebookgo/ensure"
)

func TestOnce_Do(t *testing.T) {
var o once

err := o.Do(func() error { return errors.New("some error") })
ensure.NotNil(t, err)

err2 := o.Do(func() error { return nil })
ensure.NotNil(t, err2)
ensure.DeepEqual(t, err, err2)
}
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type voptions struct {
updateCallback UpdateCallback
partitionChannelSize int
hasher func() hash.Hash32
restartable bool

builders struct {
storage storage.Builder
Expand Down Expand Up @@ -275,6 +276,15 @@ func WithViewClientID(clientID string) ViewOption {
}
}

// WithViewRestartable defines the view can be restarted, even when Start()
// returns errors. If the view is restartable, the client must call Stop() to
// release all resources.
func WithViewRestartable() ViewOption {
return func(o *voptions) {
o.restartable = true
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
56 changes: 35 additions & 21 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ const (
stalledTimeout = 2 * time.Minute
)

// partition represents one partition of a group table and handles the updates to
// this table via UpdateCallback and ProcessCallback.
//
// partition can be started in two modes:
// - catchup-mode: used by views, starts with startCatchup(), only UpdateCallback called
// - processing-mode: used by processors, starts with start(),
// recovers table with UpdateCallback
// processes input streams with ProcessCallback
//
// The partition should never be called with a closed storage proxy.
// - Before starting the partition in either way, the client must open the storage proxy.
// - A partition may be restarted even if it returned errors. Before restarting
// it, the client must call reinit().
// - To release all resources, after stopping the partition, the client must
// close the storage proxy.
//
type partition struct {
log logger.Logger
topic string
Expand Down Expand Up @@ -63,10 +79,9 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
dying: make(chan bool),
done: make(chan bool),

st: st,
recoveredOnce: sync.Once{},
proxy: proxy,
process: cb,
st: st,
proxy: proxy,
process: cb,

stats: newPartitionStats(),
lastStats: newPartitionStats(),
Expand All @@ -75,23 +90,27 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
}
}

// reinit reinitialzes the partition to connect to Kafka and start its goroutine
func (p *partition) reinit(proxy kafkaProxy) {
if proxy != nil {
p.proxy = proxy
}
p.ch = make(chan kafka.Event, len(p.ch))
p.dying = make(chan bool)
p.done = make(chan bool)
atomic.StoreInt64(&p.stopFlag, 0)
}

// start loads the table partition up to HWM and then consumes streams
func (p *partition) start() error {
defer close(p.done)
defer p.proxy.Stop()
p.stats.Table.StartTime = time.Now()

if !p.st.Stateless() {
err := p.st.Open()
if err != nil {
return err
}
defer p.st.Close()

if err := p.recover(); err != nil {
return err
}
} else {
if p.st.Stateless() {
p.markRecovered(false)
} else if err := p.recover(); err != nil {
return err
}

// if stopped, just return
Expand All @@ -101,17 +120,12 @@ func (p *partition) start() error {
return p.run()
}

// startCatchup continually loads the table partition
func (p *partition) startCatchup() error {
defer close(p.done)
defer p.proxy.Stop()
p.stats.Table.StartTime = time.Now()

err := p.st.Open()
if err != nil {
return err
}
defer p.st.Close()

return p.catchup()
}

Expand Down
22 changes: 0 additions & 22 deletions partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ func TestPartition_startStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, nil), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(offset, nil),
proxy.EXPECT().Add(topic, int64(offset)),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)
go func() {
Expand Down Expand Up @@ -254,13 +252,11 @@ func TestPartition_runStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, nil), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -337,13 +333,11 @@ func TestPartition_runStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, nil), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -415,15 +409,13 @@ func TestPartition_loadStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, DefaultUpdate), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(nil),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -497,11 +489,9 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -531,13 +521,11 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, DefaultUpdate), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(errors.New("some error")),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -567,9 +555,7 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, nil), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(0), errors.New("some error")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -607,10 +593,8 @@ func TestPartition_loadStatefulWithErrorAddRemovePartition(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, DefaultUpdate), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset).Return(errors.New("some error adding partition")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand All @@ -632,11 +616,9 @@ func TestPartition_loadStatefulWithErrorAddRemovePartition(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset).Return(nil),
proxy.EXPECT().Remove(topic).Return(errors.New("error while removing partition")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -690,7 +672,6 @@ func TestPartition_catchupStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
Expand All @@ -703,7 +684,6 @@ func TestPartition_catchupStateful(t *testing.T) {
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(offset+2).Return(nil),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -809,7 +789,6 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
Expand All @@ -820,7 +799,6 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Add(topic, offset+2),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down
23 changes: 16 additions & 7 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ func (g *Processor) Start() error {
// start all views
for t, v := range g.views {
go func(t string, v *View) {
err := v.Start()
if err != nil {
if err := v.Start(); err != nil {
g.fail(fmt.Errorf("error in view %s: %v", t, err))
}
}(t, v)
Expand Down Expand Up @@ -530,8 +529,10 @@ func (g *Processor) createPartitionViews(id int32) error {
}
}()

err := par.startCatchup()
if err != nil {
if err := par.st.Open(); err != nil {
g.fail(fmt.Errorf("error opening storage %s/%d: %v", par.topic, pid, err))
}
if err := par.startCatchup(); err != nil {
g.fail(fmt.Errorf("error in partition view %s/%d: %v", par.topic, pid, err))
}
g.opts.log.Printf("partition view %s/%d: exit", par.topic, pid)
Expand Down Expand Up @@ -580,8 +581,10 @@ func (g *Processor) createPartition(id int32) error {
par.topic, id, err, string(debug.Stack())))
}
}()
err := par.start()
if err != nil {
if err := par.st.Open(); err != nil {
g.fail(fmt.Errorf("error opening storage partition %d: %v", id, err))
}
if err := par.start(); err != nil {
g.fail(fmt.Errorf("error in partition %d: %v", id, err))
}
g.opts.log.Printf("partition %s/%d: exit", par.topic, id)
Expand Down Expand Up @@ -617,6 +620,9 @@ func (g *Processor) removePartition(partition int32) {

// remove partition processor
g.partitions[partition].stop()
if err := g.partitions[partition].st.Close(); err != nil {
g.opts.log.Printf("error closing storage partition %d: %v", partition, err)
}
delete(g.partitions, partition)

// remove partition views
Expand All @@ -625,8 +631,11 @@ func (g *Processor) removePartition(partition int32) {
return
}

for _, p := range pv {
for topic, p := range pv {
p.stop()
if err := p.st.Close(); err != nil {
g.opts.log.Printf("error closing storage %s/%d: %v", topic, partition, err)
}
}
delete(g.partitionViews, partition)
}
Expand Down
23 changes: 20 additions & 3 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,33 @@ type storageProxy struct {
partition int32
stateless bool
update UpdateCallback

openedOnce once
closedOnce once
}

func (s *storageProxy) Open() error {
if s == nil {
return nil
}
return s.openedOnce.Do(s.Storage.Open)
}

func (s *storageProxy) Close() error {
if s == nil {
return nil
}
return s.closedOnce.Do(s.Storage.Close)
}

func (s storageProxy) Update(k string, v []byte) error {
func (s *storageProxy) Update(k string, v []byte) error {
return s.update(s.Storage, s.partition, k, v)
}

func (s storageProxy) Stateless() bool {
func (s *storageProxy) Stateless() bool {
return s.stateless
}

func (s storageProxy) MarkRecovered() error {
func (s *storageProxy) MarkRecovered() error {
return s.Storage.MarkRecovered()
}
Loading

0 comments on commit e99e632

Please sign in to comment.