Skip to content

Commit

Permalink
Add support for Kafka's ConsumerRebalanceListener
Browse files Browse the repository at this point in the history
  • Loading branch information
pcasaes committed Jun 15, 2020
1 parent 669d79f commit 817f131
Show file tree
Hide file tree
Showing 15 changed files with 678 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ Type: _string_ | false |

Type: _int_ | false | `1`

| *consumer-rebalance-listener.name* | The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.

Type: _string_ | false |

|===
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package inbound;

import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import javax.enterprise.context.ApplicationScoped;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

@ApplicationScoped
public class KafkaRebalancedConsumer {

@Incoming("rebalanced-example")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public CompletionStage<Void> consume(IncomingKafkaRecord<Integer, String> message) {
// We don't need to ACK messages because in this example we set offset during consumer re-balance
return CompletableFuture.completedFuture(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package inbound;

import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Named;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

@ApplicationScoped
@Named("rebalanced-example.rebalancer")
public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener {

private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName());

/**
* When receiving a list of partitions will search for the earliest offset within 10 minutes
* and seek the consumer to it.
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
@Override
public Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
long now = System.currentTimeMillis();
long shouldStartAt = now - 600_000L; //10 minute ago

return Uni
.combine()
.all()
.unis(topicPartitions
.stream()
.map(topicPartition -> {
LOGGER.info("Assigned " + topicPartition);
return consumer.offsetsForTimes(topicPartition, shouldStartAt)
.onItem()
.invoke(o -> LOGGER.info("Seeking to " + o))
.onItem()
.produceUni(o -> consumer
.seek(topicPartition, o == null ? 0L : o.getOffset())
.onItem()
.invoke(v -> LOGGER.info("Seeked to " + o))
);
})
.collect(Collectors.toList()))
.combinedWith(a -> null);
}

@Override
public Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions) {
return Uni
.createFrom()
.nullItem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[#kafka-consumer-rebalance-listener]
=== Consumer Rebalance Listener

An implementation of the consumer re-balance listener can be provided which affords us fine grain controls of the assigned
offset. Common uses are storing offsets in a separate store to enable deliver exactly-once semantics, and starting from
a specific time window.

Whenever the topic partitions assigned method is called the consumer will pause. It will only resume once the returned
Uni succeeds. In the case of failure it will retry until success or until the consumer session times out in which case
it will resume again forcing a new consumer re-balance.

==== Example

In this example we will set-up a consumer that always starts on messages from at most 10 minutes ago. First we need to provide
a bean managed implementation of `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener` annotated with
`javax.inject.Named`. We then must configure our inbound connector to use this named bean.

[source, java]
----
include::example$inbound/KafkaRebalancedConsumerRebalanceListener.java[]
----

[source, java]
----
include::example$inbound/KafkaRebalancedConsumer.java[]
----

To configure the inbound connector to use the provided listener we either set the consumer rebalance listener's name:

* `mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer`

Or have the listener's name be the same as the group id:

* `mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer`

Setting the consumer re-balance listener's name takes precedence over using the group id.
2 changes: 2 additions & 0 deletions documentation/src/main/doc/modules/kafka/pages/inbound.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,5 @@ It may also contain the `dead-letter-cause` with the message from the cause, if
include::connectors:partial$META-INF/connector/smallrye-kafka-incoming.adoc[]

You can also pass any property supported by the https://vertx.io/docs/vertx-kafka-client/java/[Vert.x Kafka client] as attribute.

include::consumer-rebalance-listener.adoc[]
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,17 @@
@ConnectorAttribute(name = "partition", type = "int", direction = Direction.OUTGOING, description = "The target partition id. -1 to let the client determine the partition", defaultValue = "-1")
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = Direction.OUTGOING, description = "Whether the client waits for Kafka to acknowledge the written record before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "max-inflight-messages", type = "int", direction = Direction.OUTGOING, description = "The maximum number of messages to be written to Kafka concurrently - The default value is the value from the `max.in.flight.requests.per.connection` Kafka property. It configures the maximum number of unacknowledged requests the client before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries.", defaultValue = "5")
@ConnectorAttribute(name = "consumer-rebalance-listener.name", type = "string", direction = Direction.INCOMING, description = "The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.")
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {

public static final String CONNECTOR_NAME = "smallrye-kafka";

@Inject
ExecutionHolder executionHolder;

@Inject
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners;

private final List<KafkaSource<?, ?>> sources = new CopyOnWriteArrayList<>();
private final List<KafkaSink> sinks = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -106,7 +110,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
}

if (partitions == 1) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);

boolean broadcast = ic.getBroadcast();
Expand All @@ -120,7 +124,7 @@ public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config)
// create an instance of source per partitions.
List<Publisher<IncomingKafkaRecord<Object, Object>>> streams = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic);
KafkaSource<Object, Object> source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners);
sources.add(source);
streams.add(source.getStream());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.smallrye.reactive.messaging.kafka;

import java.util.Set;

import io.smallrye.mutiny.Uni;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer;

/**
*
* When implemented by a managed bean annotated with {@link javax.inject.Named} and
* configured against an inbound connector will be applied as a consumer re-balance listener
* to that inbound connector's consumer.
*
*
* To configure which listener you want to use, set the name in the inbound connector's consumer re-balance listener attribute,
* ex:
* {@code
* mp.messaging.incoming.example.consumer-rebalance- listener.name=ExampleConsumerRebalanceListener
* }
* {@code @Named("ExampleConsumerRebalanceListener")}
*
* Alternatively, name your listener (using the {@code @Named} annotation) to be the group id used by the connector, ex:
* {@code
* mp.messaging.incoming.example.group.id=my-group
* }
* {@code @Named("my-group")}
*
* Setting the consumer re-balance listener name takes precedence over using the group id.
*
* For more details:
*
* @see org.apache.kafka.clients.consumer.ConsumerRebalanceListener
*/
public interface KafkaConsumerRebalanceListener {

/**
* Called when the consumer is assigned topic partitions
* This method might be called for each consumer available to the connector
*
* The consumer will be paused until the returned {@link Uni}
* indicates success. On failure will retry using an exponential back off until the consumer can
* be considered timed-out by Kafka, in which case will resume anyway triggering a new re-balance.
*
* @see KafkaConsumer#pause()
* @see KafkaConsumer#resume()
*
* @param consumer underlying consumer
* @param topicPartitions set of assigned topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
Uni<Void> onPartitionsAssigned(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);

/**
* Called when the consumer is revoked topic partitions
* This method might be called for each consumer available to the connector
*
* @param consumer underlying consumer
* @param topicPartitions set of revoked topic partitions
* @return A {@link Uni} indicating operations complete or failure
*/
Uni<Void> onPartitionsRevoked(KafkaConsumer<?, ?> consumer, Set<TopicPartition> topicPartitions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,41 @@ public interface KafkaLogging extends BasicLogger {
@LogMessage(level = Logger.Level.DEBUG)
@Message(id = 18218, value = "An exception has been caught while closing the Kafka consumer")
void exceptionOnClose(@Cause Throwable t);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18219, value = "Loading KafkaConsumerRebalanceListener from configured name '%s'")
void loadingConsumerRebalanceListenerFromConfiguredName(String configuredName);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18220, value = "Loading KafkaConsumerRebalanceListener from group id '%s'")
void loadingConsumerRebalanceListenerFromGroupId(String consumerGroup);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18221, value = "Unable to execute consumer assigned re-balance listener for group '%s'. The consumer has been paused. Will retry until the consumer session times out in which case will resume to force a new re-balance attempt.")
void unableToExecuteConsumerAssignedRebalanceListener(String consumerGroup, @Cause Throwable t);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 18222, value = "Unable to execute consumer revoked re-balance listener for group '%s'")
void unableToExecuteConsumerRevokedRebalanceListener(String consumerGroup, @Cause Throwable t);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18223, value = "Executing consumer assigned re-balance listener for group '%s'")
void executingConsumerAssignedRebalanceListener(String consumerGroup);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18224, value = "Executing consumer revoked re-balance listener for group '%s'")
void executingConsumerRevokedRebalanceListener(String consumerGroup);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18225, value = "Executed consumer assigned re-balance listener for group '%s'")
void executedConsumerAssignedRebalanceListener(String consumerGroup);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 18226, value = "Executed consumer revoked re-balance listener for group '%s'")
void executedConsumerRevokedRebalanceListener(String consumerGroup);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 18227, value = "Re-enabling consumer for group '%s'. This consumer was paused because of a re-balance failure.")
void reEnablingConsumerforGroup(String consumerGroup);

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop;
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
Expand All @@ -26,7 +31,9 @@ public class KafkaSource<K, V> {
private final KafkaConsumer<K, V> consumer;
private final KafkaFailureHandler failureHandler;

public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
public KafkaSource(Vertx vertx,
KafkaConnectorIncomingConfiguration config,
Instance<KafkaConsumerRebalanceListener> consumerRebalanceListeners) {

Map<String, String> kafkaConfiguration = new HashMap<>();

Expand Down Expand Up @@ -58,8 +65,80 @@ public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) {
kafkaConfiguration.remove("retry-attempts");
kafkaConfiguration.remove("broadcast");
kafkaConfiguration.remove("partitions");
kafkaConfiguration.remove("consumer-rebalance-listener.name");

final KafkaConsumer<K, V> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfiguration);
config
.getConsumerRebalanceListenerName()
.map(name -> {
log.loadingConsumerRebalanceListenerFromConfiguredName(name);
return NamedLiteral.of(name);
})
.map(consumerRebalanceListeners::select)
.map(Instance::get)
.map(Optional::of)
.orElseGet(() -> {
Instance<KafkaConsumerRebalanceListener> rebalanceFromGroupListeners = consumerRebalanceListeners
.select(NamedLiteral.of(group));

if (!rebalanceFromGroupListeners.isUnsatisfied()) {
log.loadingConsumerRebalanceListenerFromGroupId(group);
return Optional.of(rebalanceFromGroupListeners.get());
}
return Optional.empty();
})
.ifPresent(listener -> {
// If the re-balance assign fails we must resume the consumer in order to force a consumer group
// re-balance. To do so we must wait until after the poll interval time or
// poll interval time + session timeout if group instance id is not null.
final long consumerReEnableWaitTime = Long.parseLong(
kafkaConfiguration.getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"))
+ (kafkaConfiguration.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG) == null ? 0L
: Long.parseLong(
kafkaConfiguration.getOrDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
"10000")));

// We will retry the re-balance consumer listener on failure using an exponential backoff until
// we can allow the kafka consumer to do it on its own. We do this because by default it would take
// 5 minutes for kafka to do this which is too long. With defaults consumerReEnableWaitTime would be
// 500000 millis. We also can't simply retry indefinitely because once the consumer has been paused
// for consumerReEnableWaitTime kafka will force a re-balance once resumed.
// We are doing retries using the time intervals 2s, 4s, 8s, 10s, 10s, 10s, 10s...
// The following formula will give us a reasonable number for retry attempt that is just greater
// than consumerReEnableWaitTime
final long consumerReEnableRetryMaxAttempts = 1
+ Math.max(0, 3 + (consumerReEnableWaitTime - 14_000) / 10_000);

kafkaConsumer.partitionsAssignedHandler(set -> {
kafkaConsumer.pause();
log.executingConsumerAssignedRebalanceListener(group);
listener.onPartitionsAssigned(kafkaConsumer, set)
.onFailure().invoke(t -> log.unableToExecuteConsumerAssignedRebalanceListener(group, t))
.onFailure().retry().withBackOff(Duration.ofSeconds(1), Duration.ofSeconds(10)).withJitter(0.0)
.atMost(consumerReEnableRetryMaxAttempts)
.subscribe()
.with(
a -> {
log.executedConsumerAssignedRebalanceListener(group);
kafkaConsumer.resume();
},
t -> {
log.reEnablingConsumerforGroup(group);
kafkaConsumer.resume();
});
});

kafkaConsumer.partitionsRevokedHandler(set -> {
log.executingConsumerRevokedRebalanceListener(group);
listener.onPartitionsRevoked(kafkaConsumer, set)
.subscribe()
.with(
a -> log.executedConsumerRevokedRebalanceListener(group),
t -> log.unableToExecuteConsumerRevokedRebalanceListener(group, t));
});
});
this.consumer = kafkaConsumer;

this.consumer = KafkaConsumer.create(vertx, kafkaConfiguration);
String topic = config.getTopic().orElseGet(config::getChannel);

failureHandler = createFailureHandler(config, vertx, kafkaConfiguration);
Expand Down
Loading

0 comments on commit 817f131

Please sign in to comment.