-
Notifications
You must be signed in to change notification settings - Fork 182
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for Kafka's ConsumerRebalanceListener
- Loading branch information
Showing
14 changed files
with
539 additions
and
14 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
21 changes: 21 additions & 0 deletions
21
documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package inbound; | ||
|
||
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; | ||
import org.eclipse.microprofile.reactive.messaging.Acknowledgment; | ||
import org.eclipse.microprofile.reactive.messaging.Incoming; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
|
||
@ApplicationScoped | ||
public class KafkaRebalancedConsumer { | ||
|
||
@Incoming("rebalanced-example") | ||
@Acknowledgment(Acknowledgment.Strategy.NONE) | ||
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) { | ||
// We don't need to ACK messages because in this example we set offset during consumer re-balance | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
|
||
} |
60 changes: 60 additions & 0 deletions
60
...src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package inbound; | ||
|
||
import io.smallrye.mutiny.Uni; | ||
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; | ||
import io.vertx.kafka.client.common.TopicPartition; | ||
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
import javax.inject.Named; | ||
import java.util.Set; | ||
import java.util.logging.Logger; | ||
import java.util.stream.Collectors; | ||
|
||
@ApplicationScoped | ||
@Named("rebalanced-example.rebalancer") | ||
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener { | ||
|
||
private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName()); | ||
|
||
/** | ||
* When receiving a list of partitions will search for the earliest offset within 10 minutes | ||
* and seek the consumer to it. | ||
* | ||
* @param consumer underlying consumer | ||
* @param topicPartitions set of assigned topic partitions | ||
* @return A {@link Uni} indicating operations complete or failure | ||
*/ | ||
@Override | ||
public Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) { | ||
long now = System.currentTimeMillis(); | ||
long shouldStartAt = now - 600_000L; //10 minute ago | ||
|
||
return Uni | ||
.combine() | ||
.all() | ||
.unis(topicPartitions | ||
.stream() | ||
.map(topicPartition -> { | ||
LOGGER.info("Assigned " + topicPartition); | ||
return consumer.offsetsForTimes(topicPartition, shouldStartAt) | ||
.onItem() | ||
.invoke(o -> LOGGER.info("Seeking to " + o)) | ||
.onItem() | ||
.produceUni(o -> consumer | ||
.seek(topicPartition, o == null ? 0L : o.getOffset()) | ||
.onItem() | ||
.invoke(v -> LOGGER.info("Seeked to " + o)) | ||
); | ||
}) | ||
.collect(Collectors.toList())) | ||
.combinedWith(a -> null); | ||
} | ||
|
||
@Override | ||
public Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) { | ||
return Uni | ||
.createFrom() | ||
.nullItem(); | ||
} | ||
} |
32 changes: 32 additions & 0 deletions
32
documentation/src/main/doc/modules/kafka/pages/consumer-rebalance-listener.adoc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
[#kafka-consumer-rebalance-listener] | ||
=== Consumer Rebalance Listener | ||
|
||
An implementation of the consumer re-balance listener can be provided which affords us fine grain controls of the assigned | ||
offset. Common uses are storing offsets in a separate store to enable deliver exactly-once semantics, and starting from | ||
a specific time window. | ||
|
||
==== Example | ||
|
||
In this example we will set-up a consumer that always starts on messages from at most 10 minutes ago. First we need to provide | ||
a bean managed implementation of `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener` annotated with | ||
`javax.inject.Named`. We then must configure our inbound connector to use this named bean. | ||
|
||
[source, java] | ||
---- | ||
include::example$inbound/KafkaRebalancedConsumerRebalanceListener.java[] | ||
---- | ||
|
||
[source, java] | ||
---- | ||
include::example$inbound/KafkaRebalancedConsumer.java[] | ||
---- | ||
|
||
To configure the inbound connector to use the provided listener we either set the consumer rebalance listener's name: | ||
|
||
* `mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer` | ||
|
||
Or have the listener's name be the same as the group id: | ||
|
||
* `mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer` | ||
|
||
Setting the consumer re-balance listener's name takes precedence over using the group id. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
58 changes: 58 additions & 0 deletions
58
...ka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
package io.smallrye.reactive.messaging.kafka; | ||
|
||
import java.util.Set; | ||
|
||
import io.smallrye.mutiny.Uni; | ||
import io.vertx.kafka.client.common.TopicPartition; | ||
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; | ||
|
||
/** | ||
* | ||
* When implemented by a managed bean annotated with {@link javax.inject.Named} and | ||
* configured against an inbound connector will be applied as a consumer re-balance listener | ||
* to that inbound connector's consumer. | ||
* | ||
* | ||
* To configure set the name in the inbound connector's consumer re-balance listener, ex: | ||
* mp.messaging.incoming.example.consumer-rebalance-listener.name=ExampleConsumerRebalanceListener | ||
* @Named("ExampleConsumerRebalanceListener") | ||
* | ||
* Or set the name to be the same as the group id, ex: | ||
* mp.messaging.incoming.example.group.id=my-group | ||
* @Named("my-group") | ||
* | ||
* Setting the consumer re-balance listener name takes precedence over using the group id. | ||
* | ||
* For more details: | ||
* | ||
* @see org.apache.kafka.clients.consumer.ConsumerRebalanceListener | ||
*/ | ||
public interface KafkaConsumerRebalanceListener { | ||
|
||
/** | ||
* Called when the consumer is assigned topic partitions | ||
* This method might be called for each consumer available to the connector | ||
* | ||
* The consumer will be paused until the returned {@link Uni} | ||
* indicates success. On failure will resume after a consumer re-balance is | ||
* initiated by Kafka. | ||
* | ||
* @see KafkaConsumer#pause() | ||
* @see KafkaConsumer#resume() | ||
* | ||
* @param consumer underlying consumer | ||
* @param topicPartitions set of assigned topic partitions | ||
* @return A {@link Uni} indicating operations complete or failure | ||
*/ | ||
Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions); | ||
|
||
/** | ||
* Called when the consumer is revoked topic partitions | ||
* This method might be called for each consumer available to the connector | ||
* | ||
* @param consumer underlying consumer | ||
* @param topicPartitions set of revoked topic partitions | ||
* @return A {@link Uni} indicating operations complete or failure | ||
*/ | ||
Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
29 changes: 29 additions & 0 deletions
29
...g-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionBeanWithoutAck.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
package io.smallrye.reactive.messaging.kafka; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionStage; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
|
||
import org.eclipse.microprofile.reactive.messaging.Acknowledgment; | ||
import org.eclipse.microprofile.reactive.messaging.Incoming; | ||
|
||
@ApplicationScoped | ||
public class ConsumptionBeanWithoutAck { | ||
|
||
private final List<Integer> list = Collections.synchronizedList(new ArrayList<>()); | ||
|
||
@Incoming("data") | ||
@Acknowledgment(Acknowledgment.Strategy.NONE) | ||
public CompletionStage<Void> sink(KafkaRecord<String, Integer> input) { | ||
list.add(input.getPayload() + 1); | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
|
||
public List<Integer> getResults() { | ||
return list; | ||
} | ||
} |
Oops, something went wrong.