From 22bba29fa9e8be9183c135c6f2ecdc576752413f Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Thu, 22 Jul 2021 18:54:36 +0200 Subject: [PATCH] Disable the graceful shutdown in dev and test modes. Also, propose an option to re-enable it. The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka. While this setting is highly recommended in production, in dev and test modes, it introduces slowness on shutdown. --- ...allRyeReactiveMessagingKafkaProcessor.java | 38 ++++++++++++++++--- .../kafka/ReactiveMessagingKafkaConfig.java | 19 ++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) create mode 100644 extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 012dcf505a531..836fdb584a9de 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -1,5 +1,6 @@ package io.quarkus.smallrye.reactivemessaging.kafka.deployment; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -22,9 +23,11 @@ import io.quarkus.deployment.annotations.Consume; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig; import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl; public class SmallRyeReactiveMessagingKafkaProcessor { @@ -46,18 +49,41 @@ public void build(BuildProducer reflectiveClass) { .build()); } + /** + * Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode. + */ @BuildStep - public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + public void defaultChannelConfiguration( + LaunchModeBuildItem launchMode, + ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + ReactiveMessagingKafkaConfig runtimeConfig, CombinedIndexBuildItem combinedIndex, BuildProducer defaultConfigProducer) { - if (!buildTimeConfig.serializerAutodetectionEnabled) { - return; - } - DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex()); + if (buildTimeConfig.serializerAutodetectionEnabled) { + discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + } - discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + if (launchMode.getLaunchMode().isDevOrTest()) { + if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) { + List incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING); + List channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); + List annotations = new ArrayList<>(); + annotations.addAll(incomings); + annotations.addAll(channels); + for (AnnotationInstance annotation : annotations) { + String channelName = annotation.value().asString(); + if (!discoveryState.isKafkaConnector(true, channelName)) { + continue; + } + String key = "mp.messaging.incoming." + channelName + ".graceful-shutdown"; + discoveryState.ifNotYetConfigured(key, () -> { + defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false")); + }); + } + } + } } // visible for testing diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java new file mode 100644 index 0000000000000..1e45d53a154ef --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java @@ -0,0 +1,19 @@ +package io.quarkus.smallrye.reactivemessaging.kafka; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging.kafka") +public class ReactiveMessagingKafkaConfig { + + /** + * Enables the graceful shutdown in dev and test modes. + * The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka. + * While this setting is highly recommended in production, in dev and test modes, it's disabled by default. + * This setting allows to re-enable it. + */ + @ConfigProperty(defaultValue = "false") + public boolean enableGracefulShutdownInDevAndTestMode; + +}