Skip to content

Commit

Permalink
Add support for Kafka's ConsumerRebalanceListener
Browse files Browse the repository at this point in the history
  • Loading branch information
pcasaes committed Jun 13, 2020
1 parent 1348aa5 commit fe091ae
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ Type: _string_ | false |

Type: _int_ | false | `1`

| *consumer-rebalance-listener.name* | The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.

Type: _string_ | false |

|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

@Incoming("rebalanced-example")
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
// we don't need to commit offsets so no need to ack the message
return CompletableFuture.completedFuture(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package inbound;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@ApplicationScoped
@Named("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

/**
* When receiving a list of partitions will search for the earliest offset within 10 minutes
* and seek the consumer to it. These operations are asynchronous so the inbound connector's
* consumer WILL be paused until they are compete.
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return An observable
*/
@Override
public Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
// we must pause the consumer otherwise the inbound connector will continue to receive messages
// while we seek to the correct offset.
consumer.pause();

long now = System.currentTimeMillis();
long shouldStartAt = now - 600_000L; //10 minute ago

return Uni
.combine()
.all()
.unis(topicPartitions
.stream()
.map(topicPartition -> {
LOGGER.info("Assigned " + topicPartition);
return consumer.offsetsForTimes(topicPartition, shouldStartAt)
.onItem()
.invoke(o -> LOGGER.info("Seeking to " + o))
.onItem()
.produceUni(o -> consumer
.seek(topicPartition, o == null ? 0L : o.getOffset())
.onItem()
.invoke(v -> LOGGER.info("Seeked to " + o))
);
})
.collect(Collectors.toList()))
.combinedWith(a -> null)
.onItemOrFailure()
.apply((a, t) -> {
// once the seek is complete let's resume the consumer
consumer.resume();
return null;
});
}

@Override
public Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
return Uni
.createFrom()
.nullItem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[#kafka-consumer-rebalance-listener]
=== Consumer Rebalance Listener

An implementation of the consumer rebalance listener can be provided to provide fine grain controls of the assigned
offset. Common uses are storing offsets in a separate store to enable deliver exactly-once semantics, and starting from
a specific time window.

==== Example

In this example we will set-up a consumer that always starts on messages from at most 10 minutes ago. First we need to provide
a bean managed implementation of `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener` annotated with
`javax.inject.Named`. We then must configure our inbound connector to use this named bean.

[source, java]
----
include::example$inbound/KafkaRebalancedConsumerRebalanceListener.java[]
----

[source, java]
----
include::example$inbound/KafkaRebalancedConsumer.java[]
----

Configure the inbound connector to use the provided listener:

* `mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer`

2 changes: 2 additions & 0 deletions documentation/src/main/doc/modules/kafka/pages/inbound.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ It may also contain the `dead-letter-cause` with the message from the cause, if
include::connectors:partial$META-INF/connector/smallrye-kafka-incoming.adoc[]

You can also pass any property supported by the https://vertx.io/docs/vertx-kafka-client/java/[Vert.x Kafka client] as attribute.

include::consumer-rebalance-listener.adoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@
@ConnectorAttribute(name = "partition", type = "int", direction = Direction.OUTGOING, description = "The target partition id. -1 to let the client determine the partition", defaultValue = "-1")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = Direction.OUTGOING, description = "Whether the client waits for Kafka to acknowledge the written record before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "max-inflight-messages", type = "int", direction = Direction.OUTGOING, description = "The maximum number of messages to be written to Kafka concurrently - The default value is the value from the `max.in.flight.requests.per.connection` Kafka property. It configures the maximum number of unacknowledged requests the client before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries.", defaultValue = "5")
@ConnectorAttribute(name = "consumer-rebalance-listener.name", type = "string", direction = Direction.INCOMING, description = "The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.")
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

public static final String CONNECTOR_NAME = "smallrye-kafka";

@Inject
ExecutionHolder executionHolder;

@Inject
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;

private final List<KafkaSource<?, ?>> sources = new CopyOnWriteArrayList<>();
private final List<KafkaSink> sinks = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -106,7 +110,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
}

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);

boolean broadcast = ic.getBroadcast();
Expand All @@ -120,7 +124,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
// create an instance of source per partitions.
List<Publisher<IncomingKafkaRecord<Object, Object>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);
streams.add(source.getStream());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.Set;

import io.smallrye.mutiny.Uni;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

/**
*
* When implemented by a managed bean annotated with {@link javax.inject.Named} and
* configured against a consumer, ex:
* mp.messaging.incoming.example.consumer-rebalance-listener.name=ExampleConsumerRebalanceListener
*
* Will be applied as a consumer rebalance listener to the consumer.
*
* For more details
*
* @see org.apache.kafka.clients.consumer.ConsumerRebalanceListener
*/
public interface KafkaConsumerRebalanceListener {

/**
* Called when the consumer is assigned topic partitions
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return An observable
*/
Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);

/**
* Called when the consumer is revoked topic partitions
*
* @param consumer underlying consumer
* @param topicPartitions set of revoked topic partitions
* @return An observable
*/
Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,12 @@ public interface KafkaLogging extends BasicLogger {
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 18218, value = "An exception has been caught while closing the Kafka consumer")
void exceptionOnClose(@Cause Throwable t);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18219, value = "Unable to execute consumer rebalance listener for group '%s'")
void unableToExecuteConsumerReblanceListener(String consumerGroup, @Cause Throwable t);

@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 18220, value = "Executed consumer rebalance listener for group '%s'")
void executedConsumerRebalanceListener(String consumerGroup);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
import static io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging.log;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.reactivestreams.Publisher;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
Expand All @@ -26,7 +33,9 @@ public class KafkaSource<K, V> {
private final KafkaConsumer<K, V> consumer;
private final KafkaFailureHandler failureHandler;

public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
public KafkaSource(Vertx vertx,
KafkaConnectorIncomingConfiguration config,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners) {

Map<String, String> kafkaConfiguration = new HashMap<>();

Expand Down Expand Up @@ -59,14 +68,47 @@ public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
kafkaConfiguration.remove("broadcast");
kafkaConfiguration.remove("partitions");

this.consumer = KafkaConsumer.create(vertx, kafkaConfiguration);
final KafkaConsumer<K, V> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfiguration);
final List<Publisher<KafkaConsumerRecord<K, V>>> multis = new ArrayList<>();
config
.getConsumerRebalanceListenerName()
.map(name -> {
log.info("Loading KafkaConsumerRebalanceListener " + name);
return NamedLiteral.of(name);
})
.map(consumerRebalanceListeners::select)
.map(Instance::get)
.ifPresent(listener -> multis.add(Multi
.createFrom()
.publisher(subscriber -> {

kafkaConsumer.partitionsAssignedHandler(set -> listener.onPartitionsAssigned(kafkaConsumer, set)
.onFailure().invoke(t -> log.unableToExecuteConsumerReblanceListener(group, t))
.subscribe()
.with(a -> log.executedConsumerRebalanceListener(group)));

kafkaConsumer.partitionsRevokedHandler(set -> listener.onPartitionsRevoked(kafkaConsumer, set)
.onFailure().invoke(t -> log.unableToExecuteConsumerReblanceListener(group, t))
.subscribe()
.with(a -> log.executedConsumerRebalanceListener(group)));
})));
this.consumer = kafkaConsumer;

String topic = config.getTopic().orElseGet(config::getChannel);

failureHandler = createFailureHandler(config, vertx, kafkaConfiguration);

Multi<KafkaConsumerRecord<K, V>> multi = consumer.toMulti()
Multi<KafkaConsumerRecord<K, V>> consumerMulti = consumer.toMulti()
.onFailure().invoke(t -> log.unableToReadRecord(topic, t));

multis.add(consumerMulti);

Multi<KafkaConsumerRecord<K, V>> multi = multis.size() == 1 ? consumerMulti
: Multi
.createBy()
.merging()
.streams(multis);

boolean retry = config.getRetry();
if (retry) {
int max = config.getRetryAttempts();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class ConsumptionBeanWithoutAck {

private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());
private final List<KafkaRecord<String, Integer>> kafka = Collections.synchronizedList(new ArrayList<>());

@Incoming("data")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Message<Integer> process(KafkaRecord<String, Integer> input) {
kafka.add(input);
return Message.of(input.getPayload() + 1);
}

@Incoming("sink")
public void sink(int val) {
list.add(val);
}

public List<Integer> getResults() {
return list;
}

public List<KafkaRecord<String, Integer>> getKafkaMessages() {
return kafka;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;

import io.smallrye.mutiny.Uni;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

@ApplicationScoped
@Named("ConsumptionConsumerRebalanceListener")
public class ConsumptionConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

private final Map<Integer, TopicPartition> assigned = new ConcurrentHashMap<>();

@Override
public Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> set) {
set.forEach(topicPartition -> this.assigned.put(topicPartition.getPartition(), topicPartition));
return Uni
.createFrom()
.nullItem();
}

@Override
public Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> set) {
return Uni
.createFrom()
.nullItem();
}

public Map<Integer, TopicPartition> getAssigned() {
return assigned;
}
}
Loading

0 comments on commit fe091ae

Please sign in to comment.