Skip to content

Commit

Permalink
#176 bugfix processor shutdown when producer gets closed
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Mar 8, 2019
1 parent 3b534da commit def1f83
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 16 deletions.
53 changes: 37 additions & 16 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"fmt"
"sync"

"github.com/Shopify/sarama"
)
Expand All @@ -17,6 +18,7 @@ type producer struct {
producer sarama.AsyncProducer
stop chan bool
done chan bool
wg sync.WaitGroup
}

// NewProducer creates new kafka producer for passed brokers.
Expand All @@ -32,17 +34,26 @@ func NewProducer(brokers []string, config *sarama.Config) (Producer, error) {
done: make(chan bool),
}

go p.run()
p.run()

return &p, nil
}

// Close stops the producer and waits for the Success/Error channels to drain.
// Emitting to a closing/closed producer results in write-to-closed-channel panic
func (p *producer) Close() error {
close(p.stop)
<-p.done
return p.producer.Close()
// do an async close to get the rest of the success/error messages to avoid
// leaving unfinished promises.
p.producer.AsyncClose()

// wait for the channels to drain
p.wg.Wait()

return nil
}

// Emit emits a key-value pair to topic and returns a Promise that
// can be checked for errors asynchronously
func (p *producer) Emit(topic string, key string, value []byte) *Promise {
promise := NewPromise()
p.producer.Input() <- &sarama.ProducerMessage{
Expand All @@ -56,19 +67,29 @@ func (p *producer) Emit(topic string, key string, value []byte) *Promise {

// resolve or reject a promise in the message's metadata on Success or Error
func (p *producer) run() {
defer close(p.done)
for {
select {
case <-p.stop:
return
p.wg.Add(2)
go func() {
defer p.wg.Done()
for {
err, ok := <-p.producer.Errors()

case err := <-p.producer.Errors():
promise := err.Msg.Metadata.(*Promise)
promise.Finish(err.Err)
// channel closed, the producer is stopping
if !ok {
return
}
err.Msg.Metadata.(*Promise).Finish(err.Err)
}
}()

case msg := <-p.producer.Successes():
promise := msg.Metadata.(*Promise)
promise.Finish(nil)
go func() {
defer p.wg.Done()
for {
msg, ok := <-p.producer.Successes()
// channel closed, the producer is stopping
if !ok {
return
}
msg.Metadata.(*Promise).Finish(nil)
}
}
}()
}
68 changes: 68 additions & 0 deletions kafka/producer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// +build kafka

package kafka

import (
"log"
"sync/atomic"
"testing"
"time"
)

// This tests how a producer behaves when being shutdown to make sure,
// no promises stay unfinished.
// To run the test, get a local kafka-container running (e.g. go to
// examples-directory and do `make restart`), then run the tests with
// `go test -v github.com/lovoo/goka/kafka/ -tags=kafka`
func TestProducerError(t *testing.T) {
cfg := NewConfig().Config
p, err := NewProducer([]string{"localhost:9092"}, &cfg)

if err != nil {
t.Fatalf("error creating producer: %v", err)
}

var (
promises []*Promise
donePromises int64
emitted int64
done = make(chan bool)
)

go func() {
defer func() {
recover()
}()
defer close(done)
for {
promise := p.Emit("test", "test", []byte{})
promise.Then(func(err error) {
atomic.AddInt64(&donePromises, 1)
if err != nil {
log.Printf("error producing message: %v", err)
}
})
promises = append(promises, promise)
emitted++
time.Sleep(20 * time.Millisecond)
}
}()

// let it run for 1 second
time.Sleep(1000 * time.Millisecond)

// close the producer
err = p.Close()
if err != nil {
log.Printf("Error closing producer: %v", err)
}
// wait for the promises to be
<-done

if len(promises) != int(emitted) {
t.Errorf("Promises/Emits do not match: promises: %d, emitted. %d", len(promises), emitted)
}
if len(promises) != int(donePromises) {
t.Errorf("Promises/Done promises do not match: promises: %d, done. %d", len(promises), donePromises)
}
}

0 comments on commit def1f83

Please sign in to comment.