diff --git a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-kafka-incoming.adoc b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-kafka-incoming.adoc index 8487a48adc..7bf1c1a093 100644 --- a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-kafka-incoming.adoc +++ b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-kafka-incoming.adoc @@ -69,4 +69,8 @@ Type: _string_ | false | Type: _int_ | false | `1` +| *consumer-rebalance-listener.name* | The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer. + +Type: _string_ | false | + |=== diff --git a/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumer.java b/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumer.java new file mode 100644 index 0000000000..ea768732a3 --- /dev/null +++ b/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumer.java @@ -0,0 +1,21 @@ +package inbound; + +import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +import javax.enterprise.context.ApplicationScoped; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +@ApplicationScoped +public class KafkaRebalancedConsumer { + + @Incoming("rebalanced-example") + @Acknowledgment(Acknowledgment.Strategy.NONE) + public CompletionStage consume(IncomingKafkaRecord message) { + // We don't need to ACK messages because in this example we set offset during consumer re-balance + return CompletableFuture.completedFuture(null); + } + +} diff --git a/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java b/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java new file mode 100644 index 0000000000..ea5a4cddb8 --- /dev/null +++ b/documentation/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java @@ -0,0 +1,60 @@ +package inbound; + +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +@ApplicationScoped +@Named("rebalanced-example.rebalancer") +public class KafkaRebalancedConsumerRebalanceListener implements KafkaConsumerRebalanceListener { + + private static final Logger LOGGER = Logger.getLogger(KafkaRebalancedConsumerRebalanceListener.class.getName()); + + /** + * When receiving a list of partitions will search for the earliest offset within 10 minutes + * and seek the consumer to it. + * + * @param consumer underlying consumer + * @param topicPartitions set of assigned topic partitions + * @return A {@link Uni} indicating operations complete or failure + */ + @Override + public Uni onPartitionsAssigned(KafkaConsumer consumer, Set topicPartitions) { + long now = System.currentTimeMillis(); + long shouldStartAt = now - 600_000L; //10 minute ago + + return Uni + .combine() + .all() + .unis(topicPartitions + .stream() + .map(topicPartition -> { + LOGGER.info("Assigned " + topicPartition); + return consumer.offsetsForTimes(topicPartition, shouldStartAt) + .onItem() + .invoke(o -> LOGGER.info("Seeking to " + o)) + .onItem() + .produceUni(o -> consumer + .seek(topicPartition, o == null ? 0L : o.getOffset()) + .onItem() + .invoke(v -> LOGGER.info("Seeked to " + o)) + ); + }) + .collect(Collectors.toList())) + .combinedWith(a -> null); + } + + @Override + public Uni onPartitionsRevoked(KafkaConsumer consumer, Set topicPartitions) { + return Uni + .createFrom() + .nullItem(); + } +} diff --git a/documentation/src/main/doc/modules/kafka/pages/consumer-rebalance-listener.adoc b/documentation/src/main/doc/modules/kafka/pages/consumer-rebalance-listener.adoc new file mode 100644 index 0000000000..2de88d32bc --- /dev/null +++ b/documentation/src/main/doc/modules/kafka/pages/consumer-rebalance-listener.adoc @@ -0,0 +1,32 @@ +[#kafka-consumer-rebalance-listener] +=== Consumer Rebalance Listener + +An implementation of the consumer re-balance listener can be provided which affords us fine grain controls of the assigned +offset. Common uses are storing offsets in a separate store to enable deliver exactly-once semantics, and starting from +a specific time window. + +==== Example + +In this example we will set-up a consumer that always starts on messages from at most 10 minutes ago. First we need to provide +a bean managed implementation of `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener` annotated with +`javax.inject.Named`. We then must configure our inbound connector to use this named bean. + +[source, java] +---- +include::example$inbound/KafkaRebalancedConsumerRebalanceListener.java[] +---- + +[source, java] +---- +include::example$inbound/KafkaRebalancedConsumer.java[] +---- + +To configure the inbound connector to use the provided listener we either set the consumer rebalance listener's name: + +* `mp.messaging.incoming.rebalanced-example.consumer-rebalance-listener.name=rebalanced-example.rebalancer` + +Or have the listener's name be the same as the group id: + +* `mp.messaging.incoming.rebalanced-example.group.id=rebalanced-example.rebalancer` + +Setting the consumer re-balance listener's name takes precedence over using the group id. diff --git a/documentation/src/main/doc/modules/kafka/pages/inbound.adoc b/documentation/src/main/doc/modules/kafka/pages/inbound.adoc index 2b96efe523..83d85fd78c 100644 --- a/documentation/src/main/doc/modules/kafka/pages/inbound.adoc +++ b/documentation/src/main/doc/modules/kafka/pages/inbound.adoc @@ -94,3 +94,5 @@ It may also contain the `dead-letter-cause` with the message from the cause, if include::connectors:partial$META-INF/connector/smallrye-kafka-incoming.adoc[] You can also pass any property supported by the https://vertx.io/docs/vertx-kafka-client/java/[Vert.x Kafka client] as attribute. + +include::consumer-rebalance-listener.adoc[] diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java index b381c53417..da62715b09 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java @@ -66,6 +66,7 @@ @ConnectorAttribute(name = "partition", type = "int", direction = Direction.OUTGOING, description = "The target partition id. -1 to let the client determine the partition", defaultValue = "-1") @ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = Direction.OUTGOING, description = "Whether the client waits for Kafka to acknowledge the written record before acknowledging the message", defaultValue = "true") @ConnectorAttribute(name = "max-inflight-messages", type = "int", direction = Direction.OUTGOING, description = "The maximum number of messages to be written to Kafka concurrently - The default value is the value from the `max.in.flight.requests.per.connection` Kafka property. It configures the maximum number of unacknowledged requests the client before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries.", defaultValue = "5") +@ConnectorAttribute(name = "consumer-rebalance-listener.name", type = "string", direction = Direction.INCOMING, description = "The name set in `javax.inject.Named` of a bean that implements `io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener`. If set the listener will be applied to the consumer.") public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory { public static final String CONNECTOR_NAME = "smallrye-kafka"; @@ -73,6 +74,9 @@ public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnect @Inject ExecutionHolder executionHolder; + @Inject + Instance consumerRebalanceListeners; + private final List> sources = new CopyOnWriteArrayList<>(); private final List sinks = new CopyOnWriteArrayList<>(); @@ -106,7 +110,7 @@ public PublisherBuilder> getPublisherBuilder(Config config) } if (partitions == 1) { - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners); sources.add(source); boolean broadcast = ic.getBroadcast(); @@ -120,7 +124,7 @@ public PublisherBuilder> getPublisherBuilder(Config config) // create an instance of source per partitions. List>> streams = new ArrayList<>(); for (int i = 0; i < partitions; i++) { - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, consumerRebalanceListeners); sources.add(source); streams.add(source.getStream()); } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java new file mode 100644 index 0000000000..a722576724 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java @@ -0,0 +1,58 @@ +package io.smallrye.reactive.messaging.kafka; + +import java.util.Set; + +import io.smallrye.mutiny.Uni; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; + +/** + * + * When implemented by a managed bean annotated with {@link javax.inject.Named} and + * configured against an inbound connector will be applied as a consumer re-balance listener + * to that inbound connector's consumer. + * + * + * To configure set the name in the inbound connector's consumer re-balance listener, ex: + * mp.messaging.incoming.example.consumer-rebalance-listener.name=ExampleConsumerRebalanceListener + * @Named("ExampleConsumerRebalanceListener") + * + * Or set the name to be the same as the group id, ex: + * mp.messaging.incoming.example.group.id=my-group + * @Named("my-group") + * + * Setting the consumer re-balance listener name takes precedence over using the group id. + * + * For more details: + * + * @see org.apache.kafka.clients.consumer.ConsumerRebalanceListener + */ +public interface KafkaConsumerRebalanceListener { + + /** + * Called when the consumer is assigned topic partitions + * This method might be called for each consumer available to the connector + * + * The consumer will be paused until the returned {@link Uni} + * indicates success. On failure will resume after a consumer re-balance is + * initiated by Kafka. + * + * @see KafkaConsumer#pause() + * @see KafkaConsumer#resume() + * + * @param consumer underlying consumer + * @param topicPartitions set of assigned topic partitions + * @return A {@link Uni} indicating operations complete or failure + */ + Uni onPartitionsAssigned(KafkaConsumer consumer, Set topicPartitions); + + /** + * Called when the consumer is revoked topic partitions + * This method might be called for each consumer available to the connector + * + * @param consumer underlying consumer + * @param topicPartitions set of revoked topic partitions + * @return A {@link Uni} indicating operations complete or failure + */ + Uni onPartitionsRevoked(KafkaConsumer consumer, Set topicPartitions); +} diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java index b6f68e1f14..9e85df0f82 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java @@ -93,4 +93,29 @@ public interface KafkaLogging extends BasicLogger { @LogMessage(level = Logger.Level.DEBUG) @Message(id = 18218, value = "An exception has been caught while closing the Kafka consumer") void exceptionOnClose(@Cause Throwable t); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 18219, value = "Loading KafkaConsumerRebalanceListener from configured name '%s'") + void loadingConsumerRebalanceListenerFromConfiguredName(String configuredName); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 18220, value = "Loading KafkaConsumerRebalanceListener from group id '%s'") + void loadingConsumerRebalanceListenerFromGroupId(String consumerGroup); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 18221, value = "Unable to execute consumer assigned re-balance listener for group '%s'. The consumer has been paused and won't be resumed until a next rebalance attempt.") + void unableToExecuteConsumerAssignedRebalanceListener(String consumerGroup, @Cause Throwable t); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 18222, value = "Unable to execute consumer revoked re-balance listener for group '%s'") + void unableToExecuteConsumerRevokedRebalanceListener(String consumerGroup, @Cause Throwable t); + + @LogMessage(level = Logger.Level.DEBUG) + @Message(id = 18223, value = "Executed consumer re-balance listener for group '%s'") + void executedConsumerRebalanceListener(String consumerGroup); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 18224, value = "Re-enabling consumer for group '%s'. This consumer was paused because of a re-balance failure.") + void reEnablingConsumerforGroup(String consumerGroup); + } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java index adc20d4aa8..8e886ae27d 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java @@ -6,13 +6,19 @@ import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; + import org.apache.kafka.clients.consumer.ConsumerConfig; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord; import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration; +import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener; import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailStop; import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler; @@ -26,7 +32,9 @@ public class KafkaSource { private final KafkaConsumer consumer; private final KafkaFailureHandler failureHandler; - public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) { + public KafkaSource(Vertx vertx, + KafkaConnectorIncomingConfiguration config, + Instance consumerRebalanceListeners) { Map kafkaConfiguration = new HashMap<>(); @@ -59,7 +67,58 @@ public KafkaSource(Vertx vertx, KafkaConnectorIncomingConfiguration config) { kafkaConfiguration.remove("broadcast"); kafkaConfiguration.remove("partitions"); - this.consumer = KafkaConsumer.create(vertx, kafkaConfiguration); + // if the rebalance assign fails we must resume the consumer in order to force a consumer group re-balance + // we must wait until after the poll interval time + session timeout otherwise the consumer won't re-balance + final long consumerReEnableWaitTime = 1_000L + Long.parseLong( + kafkaConfiguration.getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000")) + + Long.parseLong( + kafkaConfiguration.getOrDefault(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")); + + final KafkaConsumer kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfiguration); + config + .getConsumerRebalanceListenerName() + .map(name -> { + log.loadingConsumerRebalanceListenerFromConfiguredName(name); + return NamedLiteral.of(name); + }) + .map(consumerRebalanceListeners::select) + .map(Instance::get) + .map(Optional::of) + .orElseGet(() -> { + Instance rebalanceFromGroupListeners = consumerRebalanceListeners + .select(NamedLiteral.of(group)); + + if (!rebalanceFromGroupListeners.isUnsatisfied()) { + log.loadingConsumerRebalanceListenerFromGroupId(group); + return Optional.of(rebalanceFromGroupListeners.get()); + } + return Optional.empty(); + }) + .ifPresent(listener -> { + kafkaConsumer.partitionsAssignedHandler(set -> { + kafkaConsumer.pause(); + listener.onPartitionsAssigned(kafkaConsumer, set) + .onFailure().invoke(t -> log.unableToExecuteConsumerAssignedRebalanceListener(group, t)) + .onFailure().recoverWithUni(() -> Uni + .createFrom() + .item((Void) null) + .onItem().delayIt().by(Duration.ofMillis(consumerReEnableWaitTime)) + .onItem().invoke(a -> { + log.reEnablingConsumerforGroup(group); + kafkaConsumer.resume(); + })) + .onItem().invoke(a -> kafkaConsumer.resume()) + .subscribe() + .with(a -> log.executedConsumerRebalanceListener(group)); + }); + + kafkaConsumer.partitionsRevokedHandler(set -> listener.onPartitionsRevoked(kafkaConsumer, set) + .onFailure().invoke(t -> log.unableToExecuteConsumerRevokedRebalanceListener(group, t)) + .subscribe() + .with(a -> log.executedConsumerRebalanceListener(group))); + }); + this.consumer = kafkaConsumer; + String topic = config.getTopic().orElseGet(config::getChannel); failureHandler = createFailureHandler(config, vertx, kafkaConfiguration); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionBeanWithoutAck.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionBeanWithoutAck.java new file mode 100644 index 0000000000..9bc8a52304 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionBeanWithoutAck.java @@ -0,0 +1,29 @@ +package io.smallrye.reactive.messaging.kafka; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Incoming; + +@ApplicationScoped +public class ConsumptionBeanWithoutAck { + + private final List list = Collections.synchronizedList(new ArrayList<>()); + + @Incoming("data") + @Acknowledgment(Acknowledgment.Strategy.NONE) + public CompletionStage sink(KafkaRecord input) { + list.add(input.getPayload() + 1); + return CompletableFuture.completedFuture(null); + } + + public List getResults() { + return list; + } +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionConsumerRebalanceListener.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionConsumerRebalanceListener.java new file mode 100644 index 0000000000..8f51561c91 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ConsumptionConsumerRebalanceListener.java @@ -0,0 +1,38 @@ +package io.smallrye.reactive.messaging.kafka; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; + +import io.smallrye.mutiny.Uni; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; + +@ApplicationScoped +@Named("ConsumptionConsumerRebalanceListener") +public class ConsumptionConsumerRebalanceListener implements KafkaConsumerRebalanceListener { + + private final Map assigned = new ConcurrentHashMap<>(); + + @Override + public Uni onPartitionsAssigned(KafkaConsumer consumer, Set set) { + set.forEach(topicPartition -> this.assigned.put(topicPartition.getPartition(), topicPartition)); + return Uni + .createFrom() + .nullItem(); + } + + @Override + public Uni onPartitionsRevoked(KafkaConsumer consumer, Set set) { + return Uni + .createFrom() + .nullItem(); + } + + public Map getAssigned() { + return assigned; + } +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java index 6859a70e59..beaf9abdce 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java @@ -11,10 +11,15 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.enterprise.inject.Instance; +import javax.enterprise.inject.literal.NamedLiteral; +import javax.enterprise.inject.spi.BeanManager; + import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -29,6 +34,7 @@ import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.reactive.messaging.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.kafka.impl.KafkaSource; +import io.vertx.kafka.client.common.TopicPartition; public class KafkaSourceTest extends KafkaTestBase { @@ -54,7 +60,7 @@ public void testSource() { config.put("bootstrap.servers", SERVERS); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List> messages = new ArrayList<>(); source.getStream().subscribe().with(messages::add); @@ -82,7 +88,7 @@ public void testSourceWithPartitions() { kafka.createTopic(topic, 3, 1); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List> messages = new ArrayList<>(); source.getStream().subscribe().with(messages::add); @@ -109,7 +115,7 @@ public void testSourceWithChannelName() { config.put("value.deserializer", IntegerDeserializer.class.getName()); config.put("bootstrap.servers", SERVERS); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List messages = new ArrayList<>(); source.getStream().subscribe().with(messages::add); @@ -138,6 +144,7 @@ public void testBroadcast() { KafkaConnector connector = new KafkaConnector(); connector.executionHolder = new ExecutionHolder(vertx); connector.defaultKafkaConfiguration = UnsatisfiedInstance.instance(); + connector.consumerRebalanceListeners = getConsumerRebalanceListeners(); connector.init(); PublisherBuilder builder = (PublisherBuilder) connector .getPublisherBuilder(new MapBasedConfig(config)); @@ -176,6 +183,7 @@ public void testBroadcastWithPartitions() { KafkaConnector connector = new KafkaConnector(); connector.executionHolder = new ExecutionHolder(vertx); connector.defaultKafkaConfiguration = UnsatisfiedInstance.instance(); + connector.consumerRebalanceListeners = getConsumerRebalanceListeners(); connector.init(); PublisherBuilder builder = (PublisherBuilder) connector .getPublisherBuilder(new MapBasedConfig(config)); @@ -212,7 +220,7 @@ public void testRetry() throws IOException, InterruptedException { config.put("bootstrap.servers", SERVERS); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List messages1 = new ArrayList<>(); source.getStream().subscribe().with(m -> messages1.add(m)); @@ -242,7 +250,7 @@ private Map newCommonConfig() { return config; } - private MapBasedConfig myKafkaSourceConfig(int partitions) { + private MapBasedConfig myKafkaSourceConfig(int partitions, String withConsumerRebalanceListener) { String prefix = "mp.messaging.incoming.data."; Map config = new HashMap<>(); config.put(prefix + "connector", KafkaConnector.CONNECTOR_NAME); @@ -255,13 +263,34 @@ private MapBasedConfig myKafkaSourceConfig(int partitions) { config.put(prefix + "partitions", Integer.toString(partitions)); config.put(prefix + "topic", "data-" + partitions); } + if (withConsumerRebalanceListener != null) { + config.put(prefix + "consumer-rebalance-listener.name", withConsumerRebalanceListener); + } + + return new MapBasedConfig(config); + } + + private MapBasedConfig myKafkaSourceConfigWithoutAck(boolean failOnFirst) { + String prefix = "mp.messaging.incoming.data."; + Map config = new HashMap<>(); + config.put(prefix + "connector", KafkaConnector.CONNECTOR_NAME); + config.put(prefix + "group.id", "my-group-fail-on-first-" + failOnFirst); + config.put(prefix + "value.deserializer", IntegerDeserializer.class.getName()); + config.put(prefix + "enable.auto.commit", "false"); + config.put(prefix + "auto.offset.reset", "earliest"); + config.put(prefix + "topic", "data-starting-on-fifth-" + failOnFirst); + if (failOnFirst) { + config.put(prefix + "max.poll.interval.ms", "2000"); + config.put(prefix + "session.timeout.ms", "6000"); + config.put(prefix + "heartbeat.interval.ms", "2000"); + } return new MapBasedConfig(config); } @Test public void testABeanConsumingTheKafkaMessages() { - ConsumptionBean bean = deploy(myKafkaSourceConfig(0)); + ConsumptionBean bean = deploy(myKafkaSourceConfig(0, null)); KafkaUsage usage = new KafkaUsage(); List list = bean.getResults(); assertThat(list).isEmpty(); @@ -283,7 +312,7 @@ public void testABeanConsumingTheKafkaMessages() { @Test public void testABeanConsumingTheKafkaMessagesWithPartitions() { kafka.createTopic("data-2", 2, 1); - ConsumptionBean bean = deploy(myKafkaSourceConfig(2)); + ConsumptionBean bean = deploy(myKafkaSourceConfig(2, ConsumptionConsumerRebalanceListener.class.getSimpleName())); KafkaUsage usage = new KafkaUsage(); List list = bean.getResults(); assertThat(list).isEmpty(); @@ -300,6 +329,64 @@ public void testABeanConsumingTheKafkaMessagesWithPartitions() { assertThat(m.getTimestamp()).isAfter(Instant.EPOCH); assertThat(m.getPartition()).isGreaterThan(-1); }); + + ConsumptionConsumerRebalanceListener consumptionConsumerRebalanceListener = getConsumptionConsumerRebalanceListener(); + assertThat(consumptionConsumerRebalanceListener.getAssigned().size()).isEqualTo(2); + for (int i = 0; i < 2; i++) { + TopicPartition topicPartition = consumptionConsumerRebalanceListener.getAssigned().get(i); + assertThat(topicPartition).isNotNull(); + assertThat(topicPartition.getTopic()).isEqualTo("data-2"); + } + } + + @Test + public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatest() { + KafkaUsage usage = new KafkaUsage(); + AtomicInteger counter = new AtomicInteger(); + AtomicBoolean callback = new AtomicBoolean(false); + new Thread(() -> usage.produceIntegers(10, () -> callback.set(true), + () -> new ProducerRecord<>("data-starting-on-fifth-false", counter.getAndIncrement()))).start(); + + await() + .atMost(2, TimeUnit.MINUTES) + .until(callback::get); + /* + * Will use StartFromFifthOffsetFromLatestConsumerRebalanceListener + */ + ConsumptionBeanWithoutAck bean = deployWithoutAck( + myKafkaSourceConfigWithoutAck(false)); + List list = bean.getResults(); + + await() + .atMost(2, TimeUnit.MINUTES) + .until(() -> list.size() >= 5); + + assertThat(list).containsExactly(6, 7, 8, 9, 10); + } + + @Test + public void testABeanConsumingTheKafkaMessagesStartingOnFifthOffsetFromLatestThatFailsOnTheFirstAttempt() { + KafkaUsage usage = new KafkaUsage(); + AtomicInteger counter = new AtomicInteger(); + AtomicBoolean callback = new AtomicBoolean(false); + new Thread(() -> usage.produceIntegers(10, () -> callback.set(true), + () -> new ProducerRecord<>("data-starting-on-fifth-true", counter.getAndIncrement()))).start(); + + await() + .atMost(2, TimeUnit.MINUTES) + .until(callback::get); + /* + * Will use StartFromFifthOffsetFromLatestConsumerRebalanceListener + */ + ConsumptionBeanWithoutAck bean = deployWithoutAck( + myKafkaSourceConfigWithoutAck(true)); + List list = bean.getResults(); + + await() + .atMost(2, TimeUnit.MINUTES) + .until(() -> list.size() >= 5); + + assertThat(list).containsExactly(6, 7, 8, 9, 10); } @SuppressWarnings("unchecked") @@ -313,7 +400,7 @@ public void testInvalidIncomingType() { config.put("bootstrap.servers", SERVERS); config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List> messages = new ArrayList<>(); source.getStream().subscribe().with(messages::add); @@ -339,7 +426,7 @@ public void testInvalidIncomingType() { @SuppressWarnings("unchecked") @Test public void testABeanConsumingTheKafkaMessagesWithRawMessage() { - ConsumptionBeanUsingRawMessage bean = deployRaw(myKafkaSourceConfig(0)); + ConsumptionBeanUsingRawMessage bean = deployRaw(myKafkaSourceConfig(0, null)); KafkaUsage usage = new KafkaUsage(); List list = bean.getResults(); assertThat(list).isEmpty(); @@ -374,7 +461,7 @@ public void testSourceWithEmptyOptionalConfiguration() { config.put("sasl.mechanism", ""); //optional configuration config.put("channel-name", topic); KafkaConnectorIncomingConfiguration ic = new KafkaConnectorIncomingConfiguration(new MapBasedConfig(config)); - KafkaSource source = new KafkaSource<>(vertx, ic); + KafkaSource source = new KafkaSource<>(vertx, ic, getConsumerRebalanceListeners()); List> messages = new ArrayList<>(); source.getStream().subscribe().with(messages::add); @@ -388,15 +475,53 @@ public void testSourceWithEmptyOptionalConfiguration() { .collect(Collectors.toList())).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); } + private BeanManager getBeanManager() { + if (container == null) { + Weld weld = baseWeld(); + addConfig(new MapBasedConfig(new HashMap<>())); + weld.disableDiscovery(); + container = weld.initialize(); + } + return container.getBeanManager(); + } + + private Instance getConsumerRebalanceListeners() { + return getBeanManager() + .createInstance() + .select(KafkaConsumerRebalanceListener.class); + } + + private ConsumptionConsumerRebalanceListener getConsumptionConsumerRebalanceListener() { + return getBeanManager() + .createInstance() + .select(ConsumptionConsumerRebalanceListener.class) + .select(NamedLiteral.of(ConsumptionConsumerRebalanceListener.class.getSimpleName())) + .get(); + + } + private ConsumptionBean deploy(MapBasedConfig config) { Weld weld = baseWeld(); addConfig(config); weld.addBeanClass(ConsumptionBean.class); + weld.addBeanClass(ConsumptionConsumerRebalanceListener.class); weld.disableDiscovery(); container = weld.initialize(); return container.getBeanManager().createInstance().select(ConsumptionBean.class).get(); } + private ConsumptionBeanWithoutAck deployWithoutAck(MapBasedConfig config) { + Weld weld = baseWeld(); + addConfig(config); + weld.addBeanClass(ConsumptionBeanWithoutAck.class); + weld.addBeanClass(ConsumptionConsumerRebalanceListener.class); + weld.addBeanClass(StartFromFifthOffsetFromLatestConsumerRebalanceListener.class); + weld.addBeanClass(StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener.class); + weld.disableDiscovery(); + container = weld.initialize(); + return container.getBeanManager().createInstance().select(ConsumptionBeanWithoutAck.class).get(); + } + private ConsumptionBeanUsingRawMessage deployRaw(MapBasedConfig config) { Weld weld = baseWeld(); addConfig(config); diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener.java new file mode 100644 index 0000000000..db5e06dd10 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener.java @@ -0,0 +1,30 @@ +package io.smallrye.reactive.messaging.kafka; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; + +import io.smallrye.mutiny.Uni; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; + +@ApplicationScoped +@Named("my-group-fail-on-first-true") +public class StartFromFifthOffsetFromLatestButFailOnFirstConsumerRebalanceListener + extends StartFromFifthOffsetFromLatestConsumerRebalanceListener { + + private final AtomicBoolean failOnFirstAttempt = new AtomicBoolean(true); + + @Override + public Uni onPartitionsAssigned(KafkaConsumer consumer, Set set) { + if (!set.isEmpty() && failOnFirstAttempt.getAndSet(false)) { + // will perform the underlying operation but simulate an error + return super.onPartitionsAssigned(consumer, set) + .onItem() + .failWith(a -> new Exception("testing failure")); + } + return super.onPartitionsAssigned(consumer, set); + } +} diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestConsumerRebalanceListener.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestConsumerRebalanceListener.java new file mode 100644 index 0000000000..a515e75639 --- /dev/null +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/StartFromFifthOffsetFromLatestConsumerRebalanceListener.java @@ -0,0 +1,37 @@ +package io.smallrye.reactive.messaging.kafka; + +import java.util.Set; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Named; + +import io.smallrye.mutiny.Uni; +import io.vertx.kafka.client.common.TopicPartition; +import io.vertx.mutiny.kafka.client.consumer.KafkaConsumer; + +@ApplicationScoped +@Named("my-group-fail-on-first-false") +public class StartFromFifthOffsetFromLatestConsumerRebalanceListener implements KafkaConsumerRebalanceListener { + + @Override + public Uni onPartitionsAssigned(KafkaConsumer consumer, Set set) { + return Uni + .combine() + .all() + .unis(set + .stream() + .map(topicPartition -> consumer.endOffsets(topicPartition) + .onItem() + .produceUni(o -> consumer.seek(topicPartition, Math.max(0L, o - 5L)))) + .collect(Collectors.toList())) + .combinedWith(a -> null); + } + + @Override + public Uni onPartitionsRevoked(KafkaConsumer consumer, Set topicPartitions) { + return Uni + .createFrom() + .nullItem(); + } +}