Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add restartable view support #105

Merged
merged 2 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions once.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package goka

import "sync"

type once struct {
once sync.Once
err error
}

// Do runs only once and always return the same error.
func (o *once) Do(f func() error) error {
o.once.Do(func() { o.err = f() })
return o.err
}
19 changes: 19 additions & 0 deletions once_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package goka

import (
"errors"
"testing"

"github.com/facebookgo/ensure"
)

func TestOnce_Do(t *testing.T) {
var o once

err := o.Do(func() error { return errors.New("some error") })
ensure.NotNil(t, err)

err2 := o.Do(func() error { return nil })
ensure.NotNil(t, err2)
ensure.DeepEqual(t, err, err2)
}
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ type voptions struct {
updateCallback UpdateCallback
partitionChannelSize int
hasher func() hash.Hash32
restartable bool

builders struct {
storage storage.Builder
Expand Down Expand Up @@ -275,6 +276,15 @@ func WithViewClientID(clientID string) ViewOption {
}
}

// WithViewRestartable defines the view can be restarted, even when Start()
// returns errors. If the view is restartable, the client must call Stop() to
// release all resources.
func WithViewRestartable() ViewOption {
return func(o *voptions) {
o.restartable = true
}
}

func (opt *voptions) applyOptions(topic Table, opts ...ViewOption) error {
opt.clientID = defaultClientID
opt.log = logger.Default()
Expand Down
56 changes: 35 additions & 21 deletions partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ const (
stalledTimeout = 2 * time.Minute
)

// partition represents one partition of a group table and handles the updates to
// this table via UpdateCallback and ProcessCallback.
//
// partition can be started in two modes:
// - catchup-mode: used by views, starts with startCatchup(), only UpdateCallback called
// - processing-mode: used by processors, starts with start(),
// recovers table with UpdateCallback
// processes input streams with ProcessCallback
//
// The partition should never be called with a closed storage proxy.
// - Before starting the partition in either way, the client must open the storage proxy.
// - A partition may be restarted even if it returned errors. Before restarting
// it, the client must call reinit().
// - To release all resources, after stopping the partition, the client must
// close the storage proxy.
//
type partition struct {
log logger.Logger
topic string
Expand Down Expand Up @@ -63,10 +79,9 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
dying: make(chan bool),
done: make(chan bool),

st: st,
recoveredOnce: sync.Once{},
proxy: proxy,
process: cb,
st: st,
proxy: proxy,
process: cb,

stats: newPartitionStats(),
lastStats: newPartitionStats(),
Expand All @@ -75,23 +90,27 @@ func newPartition(log logger.Logger, topic string, cb processCallback, st *stora
}
}

// reinit reinitialzes the partition to connect to Kafka and start its goroutine
func (p *partition) reinit(proxy kafkaProxy) {
if proxy != nil {
p.proxy = proxy
}
p.ch = make(chan kafka.Event, len(p.ch))
p.dying = make(chan bool)
p.done = make(chan bool)
atomic.StoreInt64(&p.stopFlag, 0)
}

// start loads the table partition up to HWM and then consumes streams
func (p *partition) start() error {
defer close(p.done)
defer p.proxy.Stop()
p.stats.Table.StartTime = time.Now()

if !p.st.Stateless() {
err := p.st.Open()
if err != nil {
return err
}
defer p.st.Close()

if err := p.recover(); err != nil {
return err
}
} else {
if p.st.Stateless() {
p.markRecovered(false)
} else if err := p.recover(); err != nil {
return err
}

// if stopped, just return
Expand All @@ -101,17 +120,12 @@ func (p *partition) start() error {
return p.run()
}

// startCatchup continually loads the table partition
func (p *partition) startCatchup() error {
defer close(p.done)
defer p.proxy.Stop()
p.stats.Table.StartTime = time.Now()

err := p.st.Open()
if err != nil {
return err
}
defer p.st.Close()

return p.catchup()
}

Expand Down
22 changes: 0 additions & 22 deletions partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ func TestPartition_startStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, nil), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(offset, nil),
proxy.EXPECT().Add(topic, int64(offset)),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)
go func() {
Expand Down Expand Up @@ -254,13 +252,11 @@ func TestPartition_runStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, nil), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -337,13 +333,11 @@ func TestPartition_runStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, nil), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -415,15 +409,13 @@ func TestPartition_loadStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, consume, newStorageProxy(st, 0, DefaultUpdate), proxy, defaultPartitionChannelSize)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(nil),
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Remove(topic),
proxy.EXPECT().AddGroup(),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -497,11 +489,9 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -531,13 +521,11 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, DefaultUpdate), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(int64(offset)).Return(errors.New("some error")),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -567,9 +555,7 @@ func TestPartition_loadStatefulWithError(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, nil), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(0), errors.New("some error")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -607,10 +593,8 @@ func TestPartition_loadStatefulWithErrorAddRemovePartition(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, DefaultUpdate), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset).Return(errors.New("some error adding partition")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand All @@ -632,11 +616,9 @@ func TestPartition_loadStatefulWithErrorAddRemovePartition(t *testing.T) {
p = newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset).Return(nil),
proxy.EXPECT().Remove(topic).Return(errors.New("error while removing partition")),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -690,7 +672,6 @@ func TestPartition_catchupStateful(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
Expand All @@ -703,7 +684,6 @@ func TestPartition_catchupStateful(t *testing.T) {
st.EXPECT().Set(key, value),
st.EXPECT().SetOffset(offset+2).Return(nil),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down Expand Up @@ -809,7 +789,6 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
p := newPartition(logger.Default(), topic, nil, newStorageProxy(st, 0, update), proxy, 0)

gomock.InOrder(
st.EXPECT().Open().Return(nil),
st.EXPECT().GetOffset(int64(-2)).Return(int64(offset), nil),
proxy.EXPECT().Add(topic, offset),
st.EXPECT().Set(key, value),
Expand All @@ -820,7 +799,6 @@ func TestPartition_catchupStatefulWithError(t *testing.T) {
st.EXPECT().MarkRecovered(),
proxy.EXPECT().Add(topic, offset+2),
proxy.EXPECT().Remove(topic),
st.EXPECT().Close().Return(nil),
proxy.EXPECT().Stop(),
)

Expand Down
23 changes: 16 additions & 7 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,7 @@ func (g *Processor) Start() error {
// start all views
for t, v := range g.views {
go func(t string, v *View) {
err := v.Start()
if err != nil {
if err := v.Start(); err != nil {
g.fail(fmt.Errorf("error in view %s: %v", t, err))
}
}(t, v)
Expand Down Expand Up @@ -530,8 +529,10 @@ func (g *Processor) createPartitionViews(id int32) error {
}
}()

err := par.startCatchup()
if err != nil {
if err := par.st.Open(); err != nil {
g.fail(fmt.Errorf("error opening storage %s/%d: %v", par.topic, pid, err))
}
if err := par.startCatchup(); err != nil {
g.fail(fmt.Errorf("error in partition view %s/%d: %v", par.topic, pid, err))
}
g.opts.log.Printf("partition view %s/%d: exit", par.topic, pid)
Expand Down Expand Up @@ -580,8 +581,10 @@ func (g *Processor) createPartition(id int32) error {
par.topic, id, err, string(debug.Stack())))
}
}()
err := par.start()
if err != nil {
if err := par.st.Open(); err != nil {
g.fail(fmt.Errorf("error opening storage partition %d: %v", id, err))
}
if err := par.start(); err != nil {
g.fail(fmt.Errorf("error in partition %d: %v", id, err))
}
g.opts.log.Printf("partition %s/%d: exit", par.topic, id)
Expand Down Expand Up @@ -617,6 +620,9 @@ func (g *Processor) removePartition(partition int32) {

// remove partition processor
g.partitions[partition].stop()
if err := g.partitions[partition].st.Close(); err != nil {
g.opts.log.Printf("error closing storage partition %d: %v", partition, err)
}
delete(g.partitions, partition)

// remove partition views
Expand All @@ -625,8 +631,11 @@ func (g *Processor) removePartition(partition int32) {
return
}

for _, p := range pv {
for topic, p := range pv {
p.stop()
if err := p.st.Close(); err != nil {
g.opts.log.Printf("error closing storage %s/%d: %v", topic, partition, err)
}
}
delete(g.partitionViews, partition)
}
Expand Down
23 changes: 20 additions & 3 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,33 @@ type storageProxy struct {
partition int32
stateless bool
update UpdateCallback

openedOnce once
closedOnce once
}

func (s *storageProxy) Open() error {
if s == nil {
return nil
}
return s.openedOnce.Do(s.Storage.Open)
}

func (s *storageProxy) Close() error {
if s == nil {
return nil
}
return s.closedOnce.Do(s.Storage.Close)
}

func (s storageProxy) Update(k string, v []byte) error {
func (s *storageProxy) Update(k string, v []byte) error {
return s.update(s.Storage, s.partition, k, v)
}

func (s storageProxy) Stateless() bool {
func (s *storageProxy) Stateless() bool {
return s.stateless
}

func (s storageProxy) MarkRecovered() error {
func (s *storageProxy) MarkRecovered() error {
return s.Storage.MarkRecovered()
}
Loading