diff --git a/emitter.go b/emitter.go index 72beb0a9..f0214aae 100644 --- a/emitter.go +++ b/emitter.go @@ -66,18 +66,23 @@ func (e *Emitter) Emit(key string, msg interface{}) (*kafka.Promise, error) { // EmitSync sends a message to passed topic and key func (e *Emitter) EmitSync(key string, msg interface{}) error { - promise, err := e.Emit(key, msg) + var ( + err error + promise *kafka.Promise + ) + promise, err = e.Emit(key, msg) if err != nil { return err } done := make(chan struct{}) - promise.Then(func(err error) { + promise.Then(func(asyncErr error) { + err = asyncErr close(done) }) <-done - return nil + return err } // Finish waits until the emitter is finished producing all pending messages