-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add readiness health check for Kafka Streams #5125
Add readiness health check for Kafka Streams #5125
Conversation
hmm... I don't really understand why the configuration cannot be injected in the @Readiness
@ApplicationScoped
public class KafkaStreamsHealthCheck implements HealthCheck {
@Inject
private KafkaStreamsRuntimeConfig config;
...
} Instead of that I'm obliged to duplicate the Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig and qualifiers [@Default]
- java member: io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck#config
- declared on CLASS bean [types=[io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck, org.eclipse.microprofile.health.HealthCheck, java.lang.Object], qualifiers=[@Readiness, @Any], target=io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck]
at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:472)
at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:404)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:212)
... 14 more It's not even possible to set the @Recorder
public class KafkaStreamsRecorder {
...
public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig) {
Arc.container().instance(KafkaStreamsTopologyManager.class).get().setRuntimeConfig(runtimeConfig);
Arc.container().instance(KafkaStreamsHealthCheck.class).get().setRuntimeConfig(runtimeConfig);
}
...
} Because no Could you explain me why ? Many thanks in advance. |
KafkaStreamsHelthCheck is a regular CDI bean so the configuration property should be injected via @ConfigProperty item by item like in application code. KafkaStreamsRuntimeConfig is not a CDI bean so it cannot be injected. You can use it inside your deployment module on a build step that is runtime. |
Oki, thanks, I understand now. So if the Is it not yet instantiated when I do this ? Arc.container().instance(KafkaStreamsHealthCheck.class).get().setRuntimeConfig(runtimeConfig); I'm a bit confused. |
I think the bean has been removed since Arc detect that it is unused see https://quarkus.io/guides/cdi-reference#remove_unused_beans. HTH |
Oki, thanks @machi1990. I'll check that tonight. |
I added this line in the
I tried to add the I also tried adding this register method in @BuildStep
AdditionalBeanBuildItem registerHealthCheckBean() {
return AdditionalBeanBuildItem.unremovableOf(KafkaStreamsHealthCheck.class);
} I finaly tried to register the HealthCheck class by following your example but still not retrievable.
Maybe I'm doing something wrong. |
@lhauspie there is an error on the CDI guide
This should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good.
Some minor change requested.
I didn't check the kafka-stream part in details, someone else needs to have a look at it.
...eams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsBuildTimeConfig.java
Outdated
Show resolved
Hide resolved
...eams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java
Show resolved
Hide resolved
...s/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsHealthCheck.java
Outdated
Show resolved
Hide resolved
...s/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsHealthCheck.java
Outdated
Show resolved
Hide resolved
...s/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsHealthCheck.java
Outdated
Show resolved
Hide resolved
...s/runtime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsHealthCheck.java
Outdated
Show resolved
Hide resolved
integration-tests/kafka/src/main/resources/application.properties
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks goods to me.
Again, a second review with someone more knowledgeable around Kafka Streams needs to be done :)
integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, great work! A few comments inline.
...eams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsBuildTimeConfig.java
Outdated
Show resolved
Hide resolved
As we can have several So the Liveness that has to check the state of KafkaStreams will be named |
So I'm getting second thoughts on the readiness probe based on topic existence. Is this actually what one would want? IIUC, this has at least two implications:
I like exposing this information, but I feel it's actually not part of the applications readiness state. After all, it is functional, there's just no input data for it to process. |
Please, don't provides Liveness for this kind of stuff but only readiness check, I can explain you why in details but basically liveness is to define if your application is up so it should not check any of it's dependencies (like Kafak related stuff). Regarding namming, please keep the |
The liveness probe we're discussing here is about the Kafka Streams app's own state, not a dependency (like the broker not running). |
I always feels that UP/DOWN is too limited for the state of an application, sometimes it is UP with warnings and these warnings should be addressed by the monitoring/observability tools and not something that will restart the apps automatically. Health Check is not specifically designed to have automatic restart/failover (like in Kubernetes) but can be use for this. And it's more complex when we mix functionalities in the same service ... We discussed this with Logan, and we ends up saying that this health check is usefull for kafka-stream application (so not when you mix kafka-stream and REST endpoint in the same apps), other kind of application should disable it. |
The question is: "is restarting the process will make any difference ?" |
Sure; let's just hope sole presence of such health check doesn't trick people into using it as a readiness probe then. As said, this doesn't really represent the application's readiness.
This combination is very common though, when taking interactive queries into account, where a KStreams pipeline is used to run some streaming query logic and the current state (of aggregations etc.) is exposed via Kafka Streams Interactive Queries and REST.
It depends on the specific issue. Sometimes yes, sometimes no. In any case the restart loop would tell an operator that some action is needed. In case of a bad message it could e.g. mean to manually overwrite the app's offset state so to skip this one, or perhaps send another message for the same key to a topic sourcing a |
I reviewed a last time my PR that sounds OK. Could you review please? |
...eployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java
Show resolved
Hide resolved
...eams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
Outdated
Show resolved
Hide resolved
...time/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsStateHealthCheck.java
Outdated
Show resolved
Hide resolved
integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe all my comments have been addressed, so LGTM. I am in no means an expert on Kafka, so let's wait @gunnarmorling @loicmathieu final call on this PR.
I just noticed a conflict, I'll let @lhauspie handle that.
Thanks @lhauspie
👍 The conflict is merged. |
...eams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
Outdated
Show resolved
Hide resolved
One last question above, otherwise it LGTM. Great work, @lhauspie! A very useful addition to the Kafka Streams extension, looking forward to seeing it released. |
...eams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I'll leave it to you @lhauspie, whether you'd reflect in the docs how to query liveness and readiness checks independently. E.g. just by adding the curl invocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late review.
I added a bunch of comments in the doc, a few suggestions and questions.
Could you have a look?
I think we are really not far from being in a mergeable state.
Thanks!
...eployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsBuildTimeConfig.java
Outdated
Show resolved
Hide resolved
...eams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.java
Outdated
Show resolved
Hide resolved
...ime/src/main/java/io/quarkus/kafka/streams/runtime/health/KafkaStreamsTopicsHealthCheck.java
Outdated
Show resolved
Hide resolved
@lhauspie any chance you could have a look at my review? |
Yes of course, sorry for the delay, I was busy at Disneyland this weekend ;) Let me have a look. |
I just force-pushed a rebase. Let's wait for CI to go green. |
Merged, thanks! |
fixes #4793