Skip to content

Commit

Permalink
workaround for #176. Timing out when the partition is stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon authored and Benjamin Riedel committed Aug 8, 2019
1 parent 9a69821 commit 68ebc47
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 1 deletion.
121 changes: 121 additions & 0 deletions integrationtest/processor_stuck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"context"
"fmt"
"log"
"sync/atomic"
"time"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
"github.com/lovoo/goka/kafka"
)

func main() {

proc, err := goka.NewProcessor([]string{"localhost:9092"},
goka.DefineGroup("processor-stuck-test",
goka.Input("input", new(codec.Int64), func(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
ctx.Emit("output", ctx.Key(), msg)
}),
goka.Output("output", new(codec.Int64)),
goka.Persist(new(codec.Int64)),
))

if err != nil {
log.Fatalf("Cannot start processor: %v", err)
}

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
defer close(done)

log.Printf("Running processor")
procRunErr := proc.Run(ctx)
log.Printf("Processor finished with %v", procRunErr)
}()

log.Printf("wait 5 seconds before starting to emit")
time.Sleep(5 * time.Second)

for i := 0; i < 50; i++ {
go func() {

cfg := kafka.NewConfig()
cfg.Producer.Retry.Max = 0
cfg.Producer.Retry.Backoff = 1 * time.Millisecond
emitter, err := goka.NewEmitter([]string{"localhost:9092"}, "input", new(codec.Int64),
goka.WithEmitterProducerBuilder(
kafka.ProducerBuilderWithConfig(cfg),
),
)
if err != nil {
log.Fatalf("Error creating emitter: %v", err)
}

time.Sleep(2 * time.Second)
defer func() {
log.Printf("finishing")
emitter.Finish()
log.Printf("done")
}()

defer recover()
var done int64
var emitted int64
for i := 0; ; i++ {
if atomic.LoadInt64(&done) > 0 {
break
}

// when the context is done, stop emitting
go func() {
<-ctx.Done()
atomic.AddInt64(&done, 1)
}()
emitted++
if emitted%1000 == 0 {
log.Printf("emitted %d", emitted)
}
prom, err := emitter.Emit(fmt.Sprintf("%d", i), int64(i))
if err != nil {
break
}
prom.Then(func(err error) {
if err != nil {
atomic.AddInt64(&done, 1)
}
})
time.Sleep(10 * time.Millisecond)
}
}()
}

log.Printf("waiting for the processor to shutdown")
<-done
log.Printf("processor is dead. Nice!")

cancel()
}
18 changes: 17 additions & 1 deletion partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -139,7 +140,20 @@ func newMessage(ev *kafka.Message) *message {
func (p *partition) run(ctx context.Context) error {
var wg sync.WaitGroup
p.proxy.AddGroup()
defer wg.Wait()

defer func() {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

select {
case <-done:
case <-time.NewTimer(10 * time.Second).C:
log.Printf("partition shutdown timed out. Will stop waiting.")
}
}()

for {
select {
Expand Down Expand Up @@ -185,10 +199,12 @@ func (p *partition) run(ctx context.Context) error {
select {
case p.responseStats <- p.lastStats:
case <-ctx.Done():
p.log.Printf("Partitioning exiting, context is cancelled")
return nil
}

case <-ctx.Done():
p.log.Printf("Partitioning exiting, context is cancelled (outer)")
return nil
}

Expand Down

0 comments on commit 68ebc47

Please sign in to comment.