Skip to content

Commit

Permalink
#176 bugfix processor shutdown when producer gets closed
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Aug 12, 2019
1 parent c12b6f3 commit 97f7724
Showing 1 changed file with 18 additions and 25 deletions.
43 changes: 18 additions & 25 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafka
import (
"fmt"
"sync"
"time"

"github.com/Shopify/sarama"
)
Expand Down Expand Up @@ -77,29 +76,23 @@ func (p *producer) Emit(topic string, key string, value []byte) *Promise {

// resolve or reject a promise in the message's metadata on Success or Error
func (p *producer) run() {
p.wg.Add(2)
go func() {
defer p.wg.Done()
for {
err, ok := <-p.producer.Errors()

// channel closed, the producer is stopping
if !ok {
return
}
err.Msg.Metadata.(*Promise).Finish(err.Err)
==== BASE ====
defer close(p.done)
for {
select {
case <-p.stop:
return

case err := <-p.producer.Errors():
promise := err.Msg.Metadata.(*Promise)
promise.Finish(err.Err)

case msg := <-p.producer.Successes():
promise := msg.Metadata.(*Promise)
promise.Finish(nil)
==== BASE ====
}
}()

go func() {
defer p.wg.Done()
for {
msg, ok := <-p.producer.Successes()
// channel closed, the producer is stopping
if !ok {
return
}
msg.Metadata.(*Promise).Finish(nil)
}
}()
==== BASE ====
}
==== BASE ====
}

0 comments on commit 97f7724

Please sign in to comment.