Skip to content

Commit

Permalink
Add readiness health check for Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
lhauspie authored and mmusgrov committed Dec 12, 2019
1 parent 92f8e0d commit 0df4733
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 19 deletions.
101 changes: 101 additions & 0 deletions docs/src/main/asciidoc/kafka-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,107 @@ CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"]
Now start Docker Compose as described above
(don't forget to rebuild the container images).

== Kafka Streams Health Checks

If you are using the `quarkus-smallrye-health` extension, `quarkus-kafka-streams` will automatically add:

* a readiness health check to validate that all topics declared in the `quarkus.kafka-streams.topics` property are created,
* a liveness health check based on the Kafka Streams state.

So when you access the `/health` endpoint of your application you will have information about the state of the Kafka Streams and the available and/or missing topics.

This is an example of when the status is `DOWN`:
[source, subs=attributes+]
----
curl -i http://aggregator:8080/health
HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 454
{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams state health check", <1>
"status": "DOWN",
"data": {
"state": "CREATED"
}
},
{
"name": "Kafka Streams topics health check", <2>
"status": "DOWN",
"data": {
"available_topics": "weather-stations,temperature-values",
"missing_topics": "hygrometry-values"
}
}
]
}
----
<1> Liveness health check. Also available at `/health/live` endpoint.
<2> Readiness health check. Also available at `/health/ready` endpoint.

So as you can see, the status is `DOWN` as soon as one of the `quarkus.kafka-streams.topics` is missing or the Kafka Streams `state` is not `RUNNING`.

If no topics are available, the `available_topics` key will not be present in the `data` field of the `Kafka Streams topics health check`.
As well as if no topics are missing, the `missing_topics` key will not be present in the `data` field of the `Kafka Streams topics health check`.

You can of course disable the health check of the `quarkus-kafka-streams` extension by setting the `quarkus.kafka-streams.health.enabled` property to `false` in your `application.properties`.

Obviously you can create your liveness and readiness probes based on the respective endpoints `/health/live` and `/health/ready`.

=== Liveness health check

Here is an example of the liveness check:
```
curl -i http://aggregator:8080/health/live

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 225

{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams state health check",
"status": "DOWN",
"data": {
"state": "CREATED"
}
}
]
}
```
The `state` is coming from the `KafkaStreams.State` enum.

=== Readiness health check

Here is an example of the readiness check:
```
curl -i http://aggregator:8080/health/ready

HTTP/1.1 503 Service Unavailable
content-type: application/json; charset=UTF-8
content-length: 265

{
"status": "DOWN",
"checks": [
{
"name": "Kafka Streams topics health check",
"status": "DOWN",
"data": {
"missing_topics": "weather-stations,temperature-values"
}
}
]
}

```

== Going Further

This guide has shown how you can build stream processing applications using Quarkus and the Kafka Streams APIs,
Expand Down
4 changes: 4 additions & 0 deletions extensions/kafka-streams/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health-spi</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.kafka.streams.deployment;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "kafka-streams", phase = ConfigPhase.BUILD_TIME)
public class KafkaStreamsBuildTimeConfig {

/**
* Whether or not a health check is published in case the smallrye-health extension is present (defaults to true).
*/
@ConfigItem(name = "health.enabled", defaultValue = "true")
public boolean healthEnabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.quarkus.kafka.streams.runtime.KafkaStreamsRuntimeConfig;
import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;

class KafkaStreamsProcessor {

Expand Down Expand Up @@ -196,4 +197,16 @@ void configureAndLoadRocksDb(KafkaStreamsRecorder recorder, KafkaStreamsRuntimeC
AdditionalBeanBuildItem registerBean() {
return AdditionalBeanBuildItem.unremovableOf(KafkaStreamsTopologyManager.class);
}

@BuildStep
void addHealthChecks(KafkaStreamsBuildTimeConfig buildTimeConfig, BuildProducer<HealthBuildItem> healthChecks) {
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsTopicsHealthCheck",
buildTimeConfig.healthEnabled, "kafka-streams"));
healthChecks.produce(
new HealthBuildItem(
"io.quarkus.kafka.streams.runtime.health.KafkaStreamsStateHealthCheck",
buildTimeConfig.healthEnabled, "kafka-streams"));
}
}
5 changes: 5 additions & 0 deletions extensions/kafka-streams/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<groupId>org.graalvm.nativeimage</groupId>
<artifactId>svm</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -50,6 +51,7 @@ public class KafkaStreamsTopologyManager {
private KafkaStreamsRuntimeConfig runtimeConfig;
private Instance<Topology> topology;
private Properties properties;
private Map<String, Object> adminClientConfig;

KafkaStreamsTopologyManager() {
executor = null;
Expand Down Expand Up @@ -105,10 +107,11 @@ void onStart(@Observes StartupEvent ev) {
Properties streamsProperties = getStreamsProperties(properties, bootstrapServersConfig, runtimeConfig);

streams = new KafkaStreams(topology.get(), streamsProperties);
adminClientConfig = getAdminClientConfig(bootstrapServersConfig);

executor.execute(() -> {
try {
waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics(), bootstrapServersConfig);
waitForTopicsToBeCreated(runtimeConfig.getTrimmedTopics());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
Expand All @@ -131,21 +134,9 @@ public KafkaStreams getStreams() {
return streams;
}

private void waitForTopicsToBeCreated(Collection<String> topicsToAwait, String bootstrapServersConfig)
private void waitForTopicsToBeCreated(Collection<String> topicsToAwait)
throws InterruptedException {
final Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
// include other AdminClientConfig(s) that have been configured
for (final String knownAdminClientConfig : AdminClientConfig.configNames()) {
// give preference to admin.<propname> first
if (properties.containsKey(StreamsConfig.ADMIN_CLIENT_PREFIX + knownAdminClientConfig)) {
config.put(knownAdminClientConfig, properties.get(StreamsConfig.ADMIN_CLIENT_PREFIX + knownAdminClientConfig));
} else if (properties.containsKey(knownAdminClientConfig)) {
config.put(knownAdminClientConfig, properties.get(knownAdminClientConfig));
}
}

try (AdminClient adminClient = AdminClient.create(config)) {
try (AdminClient adminClient = AdminClient.create(adminClientConfig)) {
while (true) {
try {
ListTopicsResult topics = adminClient.listTopics();
Expand All @@ -155,7 +146,7 @@ private void waitForTopicsToBeCreated(Collection<String> topicsToAwait, String b
LOGGER.debug("All expected topics created");
return;
} else {
HashSet<String> missing = new HashSet<>(topicsToAwait);
Set<String> missing = new HashSet<>(topicsToAwait);
missing.removeAll(topicNames);
LOGGER.debug("Waiting for topic(s) to be created: " + missing);
}
Expand All @@ -168,6 +159,42 @@ private void waitForTopicsToBeCreated(Collection<String> topicsToAwait, String b
}
}

public Set<String> getMissingTopics(Collection<String> topicsToCheck)
throws InterruptedException {
HashSet<String> missing = new HashSet<>(topicsToCheck);

try (AdminClient adminClient = AdminClient.create(adminClientConfig)) {
ListTopicsResult topics = adminClient.listTopics();
Set<String> topicNames = topics.names().get(10, TimeUnit.SECONDS);

if (topicNames.containsAll(topicsToCheck)) {
return Collections.EMPTY_SET;
} else {
missing.removeAll(topicNames);
}
} catch (ExecutionException | TimeoutException e) {
LOGGER.error("Failed to get topic names from broker", e);
}

return missing;
}

private Map<String, Object> getAdminClientConfig(String bootstrapServersConfig) {
Map<String, Object> adminClientConfig = new HashMap<>();
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig);
// include other AdminClientConfig(s) that have been configured
for (final String knownAdminClientConfig : AdminClientConfig.configNames()) {
// give preference to admin.<propname> first
if (properties.containsKey(StreamsConfig.ADMIN_CLIENT_PREFIX + knownAdminClientConfig)) {
adminClientConfig.put(knownAdminClientConfig,
properties.get(StreamsConfig.ADMIN_CLIENT_PREFIX + knownAdminClientConfig));
} else if (properties.containsKey(knownAdminClientConfig)) {
adminClientConfig.put(knownAdminClientConfig, properties.get(knownAdminClientConfig));
}
}
return adminClientConfig;
}

public void setRuntimeConfig(KafkaStreamsRuntimeConfig runtimeConfig) {
this.runtimeConfig = runtimeConfig;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.kafka.streams.runtime.health;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.apache.kafka.streams.KafkaStreams;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Liveness;

import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;

@Liveness
@ApplicationScoped
public class KafkaStreamsStateHealthCheck implements HealthCheck {

@Inject
private KafkaStreamsTopologyManager manager;

@Override
public HealthCheckResponse call() {
HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Kafka Streams state health check");
try {
KafkaStreams.State state = manager.getStreams().state();
responseBuilder.state(state.isRunning())
.withData("state", state.name());
} catch (Exception e) {
responseBuilder.down().withData("technical_error", e.getMessage());
}
return responseBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package io.quarkus.kafka.streams.runtime.health;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.health.HealthCheck;
import org.eclipse.microprofile.health.HealthCheckResponse;
import org.eclipse.microprofile.health.HealthCheckResponseBuilder;
import org.eclipse.microprofile.health.Readiness;
import org.jboss.logging.Logger;

import io.quarkus.kafka.streams.runtime.KafkaStreamsTopologyManager;

@Readiness
@ApplicationScoped
public class KafkaStreamsTopicsHealthCheck implements HealthCheck {

private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopicsHealthCheck.class.getName());

@ConfigProperty(name = "quarkus.kafka-streams.topics")
public List<String> topics;

// @ConfigProperty(name = "quarkus.kafka-streams.bootstrap-servers")
// public List<String> bootstrapServers;

@Inject
private KafkaStreamsTopologyManager manager;

// private String commaSeparatedBootstrapServersConfig;
private List<String> trimmedTopics;

@PostConstruct
public void init() {
// commaSeparatedBootstrapServersConfig = String.join(",", bootstrapServers);
trimmedTopics = topics.stream().map(String::trim).collect(Collectors.toList());
}

@Override
public HealthCheckResponse call() {
HealthCheckResponseBuilder builder = HealthCheckResponse.named("Kafka Streams topics health check").up();

try {
Set<String> missingTopics = manager.getMissingTopics(trimmedTopics);
List<String> availableTopics = new ArrayList<>(trimmedTopics);
availableTopics.removeAll(missingTopics);

if (!availableTopics.isEmpty()) {
builder.withData("available_topics", String.join(",", availableTopics));
}
if (!missingTopics.isEmpty()) {
builder.down().withData("missing_topics", String.join(",", missingTopics));
}
} catch (InterruptedException e) {
LOGGER.error("error when retrieving missing topics", e);
builder.down().withData("technical_error", e.getMessage());
}

return builder.build();
}
}
4 changes: 4 additions & 0 deletions integration-tests/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
<artifactId>kafka_2.12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 0df4733

Please sign in to comment.