-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
High load when broker become unavailable #194
Comments
I also noticed that sometimes the high-load happens after a metadata retrieval failure, as it poll constantly (without getting any records). |
Does this apply to 4.0 too?
… |
Yes, for both. The code didn't change much around that area. |
@cescoffier can you help creating a small reproducer? I tried: public class ProducerVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
AtomicInteger counter = new AtomicInteger();
vertx.setPeriodic(1000, l-> {
producer.write(KafkaProducerRecord.create("mytopic", String.valueOf(counter.getAndIncrement())))
.onFailure(Throwable::printStackTrace);
});
}
}
public class ConsumerVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9294");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaReadStream<String, String> consumer = KafkaReadStream.create(vertx, config);
consumer.handler(record-> {
System.out.println("record = " + record);
});
consumer.subscribe(Collections.singleton("mytopic")).onFailure(Throwable::printStackTrace);
}
} But when I stop the broker, my system load remains low. Thanks |
We have this one: quarkusio/quarkus#14366 It happens after ~ 1 minute after the shutdown of the broker. |
@cescoffier thanks I'll try again tomorrow. I had only stopped the broker, perhaps that's why I was not able to reproduce. Yeah I suspected this is due to the error handling in that method that keeps rescheduling. But if you can't reproduce, you can't be sure it's fixed 😉 |
@cescoffier I tried again without success. No CPU spike with both 3.9.4 and 4.0.0. I added some logging with the debugger, the |
@cescoffier actually this 1 sec timeout is the poll timeout defined on There is no protection in our code to make sure the value is strictly positive.Perhaps we could do that. |
It's a tricky one. It took me 5 times to have one "high-load" (~50%).
This is passed to the exception handler of the KafkaReadStream. Let me try to set the pollTimeout to a higher value to see if that's improve it. |
No same things with a pollTimeout set to 10 seconds. What I'm seeing is located in https://github.com/vert-x3/vertx-kafka-client/blob/master/src/main/java/io/vertx/kafka/client/consumer/impl/KafkaReadStreamImpl.java#L155-L173
|
@cescoffier I tried with the
I see which method you're talking about but:
I can't get to reproduce this
This does not match my observation: when
Looking at the code again, the only way I can see the task ending and submitting another immediately is when |
The exception handler gets a timeout exception about the metadata retrieval. BTW, you need to try the development branch on the quickstart repository. |
@cescoffier today I was able to reproduce on the development branch of the quickstart repo. I will come back to you soon. |
I can't reproduce anymore when I force the With version |
That's interesting, I actually didn't try with 2.6 as at the moment I'm stuck with 2.5. I will try with 2.6 and see if we can manage to update. I completely agree about the "broken state", very odd behavior. |
@tsegismont I confirm. I just updated locally to 2.6 and the issue is gone! Closing this one, now, let's try to bump the kafka version in Quarkus. |
The consumer runs into a poll loop when the broker becomes unavailable.
That introduces a high load on the system, and may even kill the application when the CPU usage exceeds the quota.
Version
3.9.5
Context
Just a simple consumer (as a stream) and close the broker while polling record.
Steps to reproduce
The text was updated successfully, but these errors were encountered: