Skip to content

Commit

Permalink
Merge pull request #147 from lovoo/cleanup-processor-rebalance
Browse files Browse the repository at this point in the history
cleanup processor rebalance
  • Loading branch information
db7 authored Oct 25, 2018
2 parents 81cba56 + 1f5e000 commit 1da6277
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 26 deletions.
3 changes: 2 additions & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ func (p *partition) load(ctx context.Context, catchup bool) (rerr error) {
case *kafka.Message:
lastMessage = time.Now()
if ev.Topic != p.topic {
return fmt.Errorf("load: wrong topic = %s", ev.Topic)
p.log.Printf("dropping message from topic = %s while loading", ev.Topic)
continue
}
if err := p.storeEvent(ev); err != nil {
return fmt.Errorf("load: error updating storage: %v", err)
Expand Down
97 changes: 72 additions & 25 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type Processor struct {

consumer kafka.Consumer
producer kafka.Producer
asCh chan kafka.Assignment

errg *multierr.ErrGroup
errors *multierr.Errors
cancel func()
}
Expand Down Expand Up @@ -113,6 +113,8 @@ func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption)
views: views,

graph: gg,

asCh: make(chan kafka.Assignment, 1),
}

return processor, nil
Expand Down Expand Up @@ -273,7 +275,7 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {

// create errorgroup
ctx, g.cancel = context.WithCancel(ctx)
g.errg, ctx = multierr.NewErrGroup(ctx)
errg, ctx := multierr.NewErrGroup(ctx)
defer g.cancel()

// collect all errors before leaving
Expand Down Expand Up @@ -314,7 +316,7 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
// start all views
for t, v := range g.views {
t, v := t, v
g.errg.Go(func() error {
errg.Go(func() error {
if err := v.Run(ctx); err != nil {
return fmt.Errorf("error starting lookup table %s: %v", t, err)
}
Expand All @@ -333,15 +335,18 @@ func (g *Processor) Run(ctx context.Context) (rerr error) {
}
if err := g.consumer.Subscribe(topics); err != nil {
g.cancel()
_ = g.errors.Merge(g.errg.Wait())
_ = g.errors.Merge(errg.Wait())
return fmt.Errorf("error subscribing topics: %v", err)
}

// start processor dispatcher
g.errg.Go(func() error { return g.run(ctx) })
errg.Go(func() error {
g.asCh <- kafka.Assignment{}
return g.waitAssignment(ctx)
})

// wait for goroutines to return
_ = g.errors.Merge(g.errg.Wait())
_ = g.errors.Merge(errg.Wait())

// remove all partitions first
g.opts.log.Printf("Processor: removing partitions")
Expand Down Expand Up @@ -380,18 +385,58 @@ func (g *Processor) pushToPartitionView(ctx context.Context, topic string, part
return nil
}

func (g *Processor) run(ctx context.Context) error {
g.opts.log.Printf("Processor: started")
defer g.opts.log.Printf("Processor: stopped")
func (g *Processor) waitAssignment(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case a := <-g.asCh:
if err := g.runAssignment(ctx, a); err != nil {
return err
}
}
}
}

func (g *Processor) runAssignment(ctx context.Context, a kafka.Assignment) error {
errs := new(multierr.Errors)
ctx, cancel := context.WithCancel(ctx)
errg, ctx := multierr.NewErrGroup(ctx)
defer cancel()

// create partitions based on assignmend
if err := g.rebalance(errg, ctx, a); err.HasErrors() {
return errs.Collect(err).NilOrError()
}

// start dispatcher
errg.Go(func() error {
err := g.dispatcher(ctx)
// cancel context even if dispatcher returned nil -- can only be a rebalance
cancel()
return err
})

// wait until dispatcher or partitions have returned
_ = errs.Merge(errg.Wait())

// all partitions should have returned at this point, so clean up
_ = errs.Merge(g.removePartitions())

return errs.NilOrError()
}

func (g *Processor) dispatcher(ctx context.Context) error {
g.opts.log.Printf("Processor: dispatcher started")
defer g.opts.log.Printf("Processor: dispatcher stopped")

for {
select {
case ev := <-g.consumer.Events():
switch ev := ev.(type) {
case *kafka.Assignment:
if err := g.rebalance(ctx, *ev); err != nil {
return fmt.Errorf("error on rebalance: %v", err)
}
g.asCh <- *ev
return nil

case *kafka.Message:
var err error
Expand Down Expand Up @@ -487,7 +532,7 @@ func (g *Processor) newStorage(topic string, id int32, update UpdateCallback) (*
}, nil
}

func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
func (g *Processor) createPartitionViews(errg *multierr.ErrGroup, ctx context.Context, id int32) error {
g.m.Lock()
defer g.m.Unlock()

Expand All @@ -511,7 +556,7 @@ func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
)
g.partitionViews[id][t.Topic()] = p

g.errg.Go(func() (err error) {
errg.Go(func() (err error) {
defer func() {
if rerr := recover(); rerr != nil {
g.opts.log.Printf("partition view %s/%d: panic", p.topic, id)
Expand All @@ -533,7 +578,7 @@ func (g *Processor) createPartitionViews(ctx context.Context, id int32) error {
return nil
}

func (g *Processor) createPartition(ctx context.Context, id int32) error {
func (g *Processor) createPartition(errg *multierr.ErrGroup, ctx context.Context, id int32) error {
if _, has := g.partitions[id]; has {
return nil
}
Expand Down Expand Up @@ -565,7 +610,7 @@ func (g *Processor) createPartition(ctx context.Context, id int32) error {
g.opts.partitionChannelSize,
)
par := g.partitions[id]
g.errg.Go(func() (err error) {
errg.Go(func() (err error) {
defer func() {
if rerr := recover(); rerr != nil {
g.opts.log.Printf("partition %s/%d: panic", par.topic, id)
Expand All @@ -586,27 +631,29 @@ func (g *Processor) createPartition(ctx context.Context, id int32) error {
return nil
}

func (g *Processor) rebalance(ctx context.Context, partitions kafka.Assignment) error {
func (g *Processor) rebalance(errg *multierr.ErrGroup, ctx context.Context, partitions kafka.Assignment) *multierr.Errors {
errs := new(multierr.Errors)
g.opts.log.Printf("Processor: rebalancing: %+v", partitions)

for id := range partitions {
// create partition views
if err := g.createPartitionViews(ctx, id); err != nil {
return errs.Collect(err).NilOrError()
if err := g.createPartitionViews(errg, ctx, id); err != nil {
errs.Collect(err)
}
// create partition processor
if err := g.createPartition(ctx, id); err != nil {
return errs.Collect(err).NilOrError()
if err := g.createPartition(errg, ctx, id); err != nil {
errs.Collect(err)
}
}
return errs
}

func (g *Processor) removePartitions() *multierr.Errors {
errs := new(multierr.Errors)
for partition := range g.partitions {
if _, has := partitions[partition]; !has {
_ = errs.Merge(g.removePartition(partition))
}
_ = errs.Merge(g.removePartition(partition))
}
return errs.NilOrError()
return errs
}

func (g *Processor) removePartition(partition int32) *multierr.Errors {
Expand Down

0 comments on commit 1da6277

Please sign in to comment.