Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add readiness health check for Kafka Streams #5125

Merged
merged 1 commit into from
Dec 7, 2019
Merged

Add readiness health check for Kafka Streams #5125

merged 1 commit into from
Dec 7, 2019

Conversation

lhauspie
Copy link
Contributor

@lhauspie lhauspie commented Nov 2, 2019

fixes #4793

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 2, 2019

hmm... I don't really understand why the configuration cannot be injected in the KafkaStreamsHelthCheck class:

@Readiness
@ApplicationScoped
public class KafkaStreamsHealthCheck implements HealthCheck {
    @Inject
    private KafkaStreamsRuntimeConfig config;
...
}

Instead of that I'm obliged to duplicate the @ConfigProperty to inject the application.properties in the HealthCheck because of this error;

Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig and qualifiers [@Default]
        - java member: io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck#config
        - declared on CLASS bean [types=[io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck, org.eclipse.microprofile.health.HealthCheck, java.lang.Object], qualifiers=[@Readiness, @Any], target=io.quarkus.kafka.streams.runtime.health.KafkaStreamsHealthCheck]
        at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:472)
        at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:404)
        at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:212)
        ... 14 more

It's not even possible to set the runtimeConfig bean in the KafkaStreamsHelthCheck class in the recorder:

@Recorder
public class KafkaStreamsRecorder {
...
    public void configureRuntimeProperties(KafkaStreamsRuntimeConfig runtimeConfig) {
        Arc.container().instance(KafkaStreamsTopologyManager.class).get().setRuntimeConfig(runtimeConfig);
        Arc.container().instance(KafkaStreamsHealthCheck.class).get().setRuntimeConfig(runtimeConfig);
    }
...
}

Because no instance is found in Arc. And, unfortunately, same problem when I tried .getBean(KafkaStreamsHelthCheck.class) instead of .instance(KafkaStreamsHelthCheck.class).

Could you explain me why ?

Many thanks in advance.

cc @gunnarmorling @loicmathieu @jmartisk

@loicmathieu
Copy link
Contributor

KafkaStreamsHelthCheck is a regular CDI bean so the configuration property should be injected via @ConfigProperty item by item like in application code.

KafkaStreamsRuntimeConfig is not a CDI bean so it cannot be injected. You can use it inside your deployment module on a build step that is runtime.

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 2, 2019

Oki, thanks, I understand now.

So if the KafkaStreamHealthCheck is a regular CDI bean... why can't I retrieve it from Arc containers ?

Is it not yet instantiated when I do this ?

Arc.container().instance(KafkaStreamsHealthCheck.class).get().setRuntimeConfig(runtimeConfig);

I'm a bit confused.

@machi1990
Copy link
Member

So if the KafkaStreamHealthCheck is a regular CDI bean... why can't I retrieve it from Arc containers ?

Is it not yet instantiated when I do this ?

Arc.container().instance(KafkaStreamsHealthCheck.class).get().setRuntimeConfig(runtimeConfig);

I think the bean has been removed since Arc detect that it is unused see https://quarkus.io/guides/cdi-reference#remove_unused_beans.
You can either annotate it with @Unremovable or produce a AdditionalBeanBuildItem build item
as shown here https://github.com/quarkusio/quarkus/blob/master/extensions/flyway/deployment/src/main/java/io/quarkus/flyway/FlywayProcessor.java#L66

HTH

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 2, 2019

Oki, thanks @machi1990.

I'll check that tonight.

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 3, 2019

I added this line in the application.properties of the kafka-streams-quickstart project but no additional log about removed beans.

quarkus.log.category."io.quarkus.arc.producer".level=DEBUG

I tried to add the @Unremovable annotation to the KafkaStreamsHealthCheck class but it still unretrievable from Arc.container().instance(KafkaStreamsHealthCheck.class).

I also tried adding this register method in KafkaStreamsProcessor but it still not retrievable.

    @BuildStep
    AdditionalBeanBuildItem registerHealthCheckBean() {
        return AdditionalBeanBuildItem.unremovableOf(KafkaStreamsHealthCheck.class);
    }

I finaly tried to register the HealthCheck class by following your example but still not retrievable.

        AdditionalBeanBuildItem unremovableProducer = AdditionalBeanBuildItem.unremovableOf(KafkaStreamsHealthCheck.class);
        additionalBeanProducer.produce(unremovableProducer);

Maybe I'm doing something wrong.

@loicmathieu
Copy link
Contributor

@lhauspie there is an error on the CDI guide

I added this line in the application.properties of the kafka-streams-quickstart project but no additional log about removed beans.

This should be quarkus.log.category."io.quarkus.arc.processor".level=DEBUG and it only works on dev mode.

Copy link
Contributor

@loicmathieu loicmathieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good.
Some minor change requested.

I didn't check the kafka-stream part in details, someone else needs to have a look at it.

Copy link
Contributor

@loicmathieu loicmathieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks goods to me.
Again, a second review with someone more knowledgeable around Kafka Streams needs to be done :)

Copy link
Contributor

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, great work! A few comments inline.

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 8, 2019

As we can have several @Liveness and @Readiness classes, I decided to rename the KafkaStreamsHealthCheck to KafkaStreamsTopicsCheck because its purpose is to check the availability of topics.

So the Liveness that has to check the state of KafkaStreams will be named KafkaStreamsStateCheck.

@gunnarmorling
Copy link
Contributor

So I'm getting second thoughts on the readiness probe based on topic existence. Is this actually what one would want? IIUC, this has at least two implications:

  • No HTTP requests will be routed to the application (affecting all the nodes as they'll all wait for the input topics); this might not be desirable when having web parts and KStreams in one app
  • It may prevent rolling upgrades as no node is ready to serve requests

I like exposing this information, but I feel it's actually not part of the applications readiness state. After all, it is functional, there's just no input data for it to process.

@loicmathieu
Copy link
Contributor

So the Liveness that has to check the state of KafkaStreams will be named KafkaStreamsStateCheck.

Please, don't provides Liveness for this kind of stuff but only readiness check, I can explain you why in details but basically liveness is to define if your application is up so it should not check any of it's dependencies (like Kafak related stuff).

Regarding namming, please keep the HealthCheck suffix: KafkaStreamsTopicsCheck => KafkaStreamsTopicsHealthCheck.

@gunnarmorling
Copy link
Contributor

it should not check any of it's dependencies (like Kafak related stuff)

The liveness probe we're discussing here is about the Kafka Streams app's own state, not a dependency (like the broker not running).

@loicmathieu
Copy link
Contributor

I like exposing this information, but I feel it's actually not part of the applications readiness state. After all, it is functional, there's just no input data for it to process.

I always feels that UP/DOWN is too limited for the state of an application, sometimes it is UP with warnings and these warnings should be addressed by the monitoring/observability tools and not something that will restart the apps automatically.

Health Check is not specifically designed to have automatic restart/failover (like in Kubernetes) but can be use for this. And it's more complex when we mix functionalities in the same service ...

We discussed this with Logan, and we ends up saying that this health check is usefull for kafka-stream application (so not when you mix kafka-stream and REST endpoint in the same apps), other kind of application should disable it.

@loicmathieu
Copy link
Contributor

The liveness probe we're discussing here is about the Kafka Streams app's own state, not a dependency (like the broker not running).

The question is: "is restarting the process will make any difference ?"

@gunnarmorling
Copy link
Contributor

Health Check is not specifically designed to have automatic restart/failover (like in Kubernetes) but can be use for this.

Sure; let's just hope sole presence of such health check doesn't trick people into using it as a readiness probe then. As said, this doesn't really represent the application's readiness.

We discussed this with Logan, and we ends up saying that this health check is usefull for kafka-stream application (so not when you mix kafka-stream and REST endpoint in the same apps),

This combination is very common though, when taking interactive queries into account, where a KStreams pipeline is used to run some streaming query logic and the current state (of aggregations etc.) is exposed via Kafka Streams Interactive Queries and REST.

The question is: "is restarting the process will make any difference ?"

It depends on the specific issue. Sometimes yes, sometimes no. In any case the restart loop would tell an operator that some action is needed. In case of a bad message it could e.g. mean to manually overwrite the app's offset state so to skip this one, or perhaps send another message for the same key to a topic sourcing a KTable etc.

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 9, 2019

I reviewed a last time my PR that sounds OK.

Could you review please?

Copy link
Member

@machi1990 machi1990 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe all my comments have been addressed, so LGTM. I am in no means an expert on Kafka, so let's wait @gunnarmorling @loicmathieu final call on this PR.

I just noticed a conflict, I'll let @lhauspie handle that.

Thanks @lhauspie

@lhauspie
Copy link
Contributor Author

lhauspie commented Nov 18, 2019

I just noticed a conflict, I'll let @lhauspie handle that.

👍

The conflict is merged.

@gunnarmorling
Copy link
Contributor

One last question above, otherwise it LGTM. Great work, @lhauspie! A very useful addition to the Kafka Streams extension, looking forward to seeing it released.

Copy link
Contributor

@gunnarmorling gunnarmorling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll leave it to you @lhauspie, whether you'd reflect in the docs how to query liveness and readiness checks independently. E.g. just by adding the curl invocations.

Copy link
Member

@gsmet gsmet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late review.

I added a bunch of comments in the doc, a few suggestions and questions.

Could you have a look?

I think we are really not far from being in a mergeable state.

Thanks!

docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
docs/src/main/asciidoc/kafka-streams.adoc Outdated Show resolved Hide resolved
@gsmet
Copy link
Member

gsmet commented Dec 2, 2019

@lhauspie any chance you could have a look at my review?

@lhauspie
Copy link
Contributor Author

lhauspie commented Dec 2, 2019

@lhauspie any chance you could have a look at my review?

Yes of course, sorry for the delay, I was busy at Disneyland this weekend ;)

Let me have a look.

@gsmet gsmet changed the title add readiness health check Add readiness health check fr Dec 6, 2019
@gsmet gsmet changed the title Add readiness health check fr Add readiness health check for Kafka Streams Dec 6, 2019
@gsmet gsmet added this to the 1.1.0 milestone Dec 6, 2019
@gsmet
Copy link
Member

gsmet commented Dec 6, 2019

I just force-pushed a rebase. Let's wait for CI to go green.

@gsmet gsmet added the triage/waiting-for-ci Ready to merge when CI successfully finishes label Dec 6, 2019
@gsmet gsmet merged commit dc9b096 into quarkusio:master Dec 7, 2019
@gsmet
Copy link
Member

gsmet commented Dec 7, 2019

Merged, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage/waiting-for-ci Ready to merge when CI successfully finishes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support Health Check for Kafka Streams extension
5 participants