-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dropping message while rebalancing #207
Comments
Hi WideLee, thanks for the report. The example you started runs an Emitter that emits messages periodically until stopped. This makes it hard to make sure there is no message loss. Thanks a lot, |
This is code of emitter, it emits 1000 message and the last message is
func runEmitter() {
emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
if err != nil {
panic(err)
}
defer emitter.Finish()
var i int
for i = 0; i < 1000; i++ {
key := fmt.Sprintf("user-%d", i%10)
value := fmt.Sprintf("%v-%s", i, time.Now())
emitter.EmitSync(key, value)
fmt.Printf("emit: %v %v\n", key, value)
}
} Here is the consumer, only added the gracefully quit and specify the samara kafka verison to the example in func process(ctx goka.Context, msg interface{}) {
var u *user
if val := ctx.Value(); val != nil {
u = val.(*user)
} else {
u = new(user)
}
u.Clicks++
ctx.SetValue(u)
fmt.Printf("[proc] key: %s clicks: %d, msg: %v\n", ctx.Key(), u.Clicks, msg)
}
func runProcessor() {
ctx, cancel := context.WithCancel(context.Background())
g := goka.DefineGroup(group,
goka.Input(topic, new(codec.String), process),
goka.Persist(new(userCodec)),
)
config := kafka.NewConfig()
config.Version = sarama.V0_11_0_0
p, err := goka.NewProcessor(brokers, g,
goka.WithConsumerBuilder(kafka.ConsumerBuilderWithConfig(config)),
goka.WithProducerBuilder(kafka.ProducerBuilderWithConfig(config)))
if err != nil {
panic(err)
}
done := make(chan bool)
go func() {
defer close(done)
if err = p.Run(ctx); err != nil {
log.Fatalf("error running processor: %v", err)
}
}()
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
<-wait // wait for SIGINT/SIGTERM
cancel() // gracefully stop processor
<-done
} Normally, if I run only one consumer, it gets the output as below, each user clicks 100 times,
But I if I run two consumers, after the second one started and rebalancing, the click count seems wired in the first consumer. Here is the output
The clicks of user-6 is normal but user-2, user-3 and user-7 expected 14. Finally, after the emitter finished 1000 messages, the total count of user-2, user-3 and user-7 is 99, other users is 100. I am not sure if IT IS a normal behavior. BTW, the version of Kafka is 0.11.0. |
Did you start the two consumers on the same machine locally? If so, there can be problems with storage, since they're using the same directory. |
Nope, two consumers are in different hosts, one in is Mac host and the other in my Parallel Desktop virtual machine (CentOS). Even the storage is |
If I run them in the same machine, it will panic with an error, but not in this case.
|
I have noticed that when the dropping message is occurred,
|
@frairon I found the reason why click count is less than expected. After rebalancing and message dispatcher is restarted, the partition message channel received messages as follow:
And the message loop is in However, the message channel of groupConsumer and simpleConsumer is the same, Is it possible to solve this problem and let every messages being processed? Thanks very much 😄 |
issue seems to be fixed, thanks @WideLee for the PR |
I have tested on 2-clicks example by running one producer to emit user's clicks and two consumers to statistics clicks. the message emitted by producer such as follow:
However, after the second consumer started and kafka rebalancing, the clicks count of first consumer is smaller than expected, the missing count equals to the number of the message "dropping message from topic = user-click while loading".
For example, user-9's click seems to be 17 which the 169 message emit by producer, but it is 16 on processor.
I wonder how to gracefully upgrade consumers on production environment without drop any messages? it seems during upgrade we need to restart each consumer and kafka rebalancing each time, and rebalancing results drop message while loading.
Thanks very much ^_^
The text was updated successfully, but these errors were encountered: