Skip to content

Commit

Permalink
Add support for Kafka's ConsumerRebalanceListener
Browse files Browse the repository at this point in the history
  • Loading branch information
pcasaes committed Jun 14, 2020
1 parent 1348aa5 commit 927f7bc
Show file tree
Hide file tree
Showing 14 changed files with 538 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ Type: _string_ | false |

Type: _int_ | false | `1`

| *consumer-rebalance-listener.name* | The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.

Type: _string_ | false |

|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
// We don't need to ACK messages because in this example we set offset during consumer re-balance
return CompletableFuture.completedFuture(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package inbound;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@ApplicationScoped
@Named("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

/**
* When receiving a list of partitions will search for the earliest offset within 10 minutes
* and seek the consumer to it.
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
@Override
public Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
long now = System.currentTimeMillis();
long shouldStartAt = now - 600_000L; //10 minute ago

return Uni
.combine()
.all()
.unis(topicPartitions
.stream()
.map(topicPartition -> {
LOGGER.info("Assigned " + topicPartition);
return consumer.offsetsForTimes(topicPartition, shouldStartAt)
.onItem()
.invoke(o -> LOGGER.info("Seeking to " + o))
.onItem()
.produceUni(o -> consumer
.seek(topicPartition, o == null ? 0L : o.getOffset())
.onItem()
.invoke(v -> LOGGER.info("Seeked to " + o))
);
})
.collect(Collectors.toList()))
.combinedWith(a -> null);
}

@Override
public Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
return Uni
.createFrom()
.nullItem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[#kafka-consumer-rebalance-listener]
=== Consumer Rebalance Listener

An implementation of the consumer re-balance listener can be provided which affords us fine grain controls of the assigned
offset. Common uses are storing offsets in a separate store to enable deliver exactly-once semantics, and starting from
a specific time window.

==== Example

In this example we will set-up a consumer that always starts on messages from at most 10 minutes ago. First we need to provide
a bean managed implementation of `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener` annotated with
`javax.inject.Named`. We then must configure our inbound connector to use this named bean.

[source, java]
----
include::example$inbound/KafkaRebalancedConsumerRebalanceListener.java[]
----

[source, java]
----
include::example$inbound/KafkaRebalancedConsumer.java[]
----

To configure the inbound connector to use the provided listener we either set the consumer rebalance listener's name:

* `mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer`

Or have the listener's name be the same as the group id:

* `mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer`

Setting the consumer re-balance listener's name takes precedence over using the group id.
2 changes: 2 additions & 0 deletions documentation/src/main/doc/modules/kafka/pages/inbound.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ It may also contain the `dead-letter-cause` with the message from the cause, if
include::connectors:partial$META-INF/connector/smallrye-kafka-incoming.adoc[]

You can also pass any property supported by the https://vertx.io/docs/vertx-kafka-client/java/[Vert.x Kafka client] as attribute.

include::consumer-rebalance-listener.adoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@
@ConnectorAttribute(name = "partition", type = "int", direction = Direction.OUTGOING, description = "The target partition id. -1 to let the client determine the partition", defaultValue = "-1")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = Direction.OUTGOING, description = "Whether the client waits for Kafka to acknowledge the written record before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "max-inflight-messages", type = "int", direction = Direction.OUTGOING, description = "The maximum number of messages to be written to Kafka concurrently - The default value is the value from the `max.in.flight.requests.per.connection` Kafka property. It configures the maximum number of unacknowledged requests the client before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries.", defaultValue = "5")
@ConnectorAttribute(name = "consumer-rebalance-listener.name", type = "string", direction = Direction.INCOMING, description = "The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.")
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

public static final String CONNECTOR_NAME = "smallrye-kafka";

@Inject
ExecutionHolder executionHolder;

@Inject
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;

private final List<KafkaSource<?, ?>> sources = new CopyOnWriteArrayList<>();
private final List<KafkaSink> sinks = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -106,7 +110,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
}

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);

boolean broadcast = ic.getBroadcast();
Expand All @@ -120,7 +124,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
// create an instance of source per partitions.
List<Publisher<IncomingKafkaRecord<Object, Object>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);
streams.add(source.getStream());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.Set;

import io.smallrye.mutiny.Uni;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

/**
*
* When implemented by a managed bean annotated with {@link javax.inject.Named} and
* configured against an inbound connector will be applied as a consumer re-balance listener
* to that inbound connector's consumer.
*
*
* To configure set the name in the inbound connector's consumer re-balance listener, ex:
* mp.messaging.incoming.example.consumer-rebalance-listener.name=ExampleConsumerRebalanceListener
* @Named("ExampleConsumerRebalanceListener")
*
* Or set the name to be the same as the group id, ex:
* mp.messaging.incoming.example.group.id=my-group
* @Named("my-group")
*
* Setting the consumer re-balance listener name takes precedence over using the group id.
*
* For more details:
*
* @see org.apache.kafka.clients.consumer.ConsumerRebalanceListener
*/
public interface KafkaConsumerRebalanceListener {

/**
* Called when the consumer is assigned topic partitions
* This method might be called for each consumer available to the connector
*
* The consumer will be paused until the returned {@link Uni}
* indicates success. On failure will resume after a consumer re-balance is
* initiated by Kafka.
*
* @see KafkaConsumer#pause()
* @see KafkaConsumer#resume()
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);

/**
* Called when the consumer is revoked topic partitions
* This method might be called for each consumer available to the connector
*
* @param consumer underlying consumer
* @param topicPartitions set of revoked topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,29 @@ public interface KafkaLogging extends BasicLogger {
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 18218, value = "An exception has been caught while closing the Kafka consumer")
void exceptionOnClose(@Cause Throwable t);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18219, value = "Loading KafkaConsumerRebalanceListener from configured name '%s'")
void loadingConsumerRebalanceListenerFromConfiguredName(String configuredName);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18220, value = "Loading KafkaConsumerRebalanceListener from group id '%s'")
void loadingConsumerRebalanceListenerFromGroupId(String consumerGroup);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18221, value = "Unable to execute consumer assigned re-balance listener for group '%s'. The consumer has been paused and won't be resumed until a next rebalance attempt.")
void unableToExecuteConsumerAssignedRebalanceListener(String consumerGroup, @Cause Throwable t);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18222, value = "Unable to execute consumer revoked re-balance listener for group '%s'")
void unableToExecuteConsumerRevokedRebalanceListener(String consumerGroup, @Cause Throwable t);

@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 18223, value = "Executed consumer re-balance listener for group '%s'")
void executedConsumerRebalanceListener(String consumerGroup);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 18224, value = "Re-enabling consumer for group '%s'. This consumer was paused because of a re-balance failure.")
void reEnablingConsumerforGroup(String consumerGroup);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
Expand All @@ -26,7 +32,9 @@ public class KafkaSource<K, V> {
private final KafkaConsumer<K, V> consumer;
private final KafkaFailureHandler failureHandler;

public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
public KafkaSource(Vertx vertx,
KafkaConnectorIncomingConfiguration config,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners) {

Map<String, String> kafkaConfiguration = new HashMap<>();

Expand Down Expand Up @@ -59,7 +67,58 @@ public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
kafkaConfiguration.remove("broadcast");
kafkaConfiguration.remove("partitions");

this.consumer = KafkaConsumer.create(vertx, kafkaConfiguration);
// if the rebalance assign fails we must resume the consumer in order to force a consumer group re-balance
// we must wait until after the poll interval time + session timeout otherwise the consumer won't re-balance
final long consumerReEnableWaitTime = 1_000L + Long.parseLong(
kafkaConfiguration.getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"))
+ Long.parseLong(
kafkaConfiguration.getOrDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"));

final KafkaConsumer<K, V> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfiguration);
config
.getConsumerRebalanceListenerName()
.map(name -> {
log.loadingConsumerRebalanceListenerFromConfiguredName(name);
return NamedLiteral.of(name);
})
.map(consumerRebalanceListeners::select)
.map(Instance::get)
.map(Optional::of)
.orElseGet(() -> {
Instance<KafkaConsumerRebalanceListener> rebalanceFromGroupListeners = consumerRebalanceListeners
.select(NamedLiteral.of(group));

if (!rebalanceFromGroupListeners.isUnsatisfied()) {
log.loadingConsumerRebalanceListenerFromGroupId(group);
return Optional.of(rebalanceFromGroupListeners.get());
}
return Optional.empty();
})
.ifPresent(listener -> {
kafkaConsumer.partitionsAssignedHandler(set -> {
kafkaConsumer.pause();
listener.onPartitionsAssigned(kafkaConsumer, set)
.onFailure().invoke(t -> log.unableToExecuteConsumerAssignedRebalanceListener(group, t))
.onFailure().recoverWithUni(() -> Uni
.createFrom()
.item((Void) null)
.onItem().delayIt().by(Duration.ofMillis(consumerReEnableWaitTime))
.onItem().invoke(a -> {
log.reEnablingConsumerforGroup(group);
kafkaConsumer.resume();
}))
.onItem().invoke(a -> kafkaConsumer.resume())
.subscribe()
.with(a -> log.executedConsumerRebalanceListener(group));
});

kafkaConsumer.partitionsRevokedHandler(set -> listener.onPartitionsRevoked(kafkaConsumer, set)
.onFailure().invoke(t -> log.unableToExecuteConsumerRevokedRebalanceListener(group, t))
.subscribe()
.with(a -> log.executedConsumerRebalanceListener(group)));
});
this.consumer = kafkaConsumer;

String topic = config.getTopic().orElseGet(config::getChannel);

failureHandler = createFailureHandler(config, vertx, kafkaConfiguration);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class ConsumptionBeanWithoutAck {

private final List<Integer> list = Collections.synchronizedList(new ArrayList<>());

@Incoming("data")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> sink(KafkaRecord<String, Integer> input) {
list.add(input.getPayload() + 1);
return CompletableFuture.completedFuture(null);
}

public List<Integer> getResults() {
return list;
}
}
Loading

0 comments on commit 927f7bc

Please sign in to comment.