diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 012dcf505a531d..2712e0119b2112 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -22,9 +22,12 @@ import io.quarkus.deployment.annotations.Consume; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.runtime.LaunchMode; +import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig; import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl; public class SmallRyeReactiveMessagingKafkaProcessor { @@ -46,18 +49,34 @@ public void build(BuildProducer reflectiveClass) { .build()); } + /** + * Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode. + */ @BuildStep - public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + public void defaultChannelConfiguration( + LaunchModeBuildItem launchMode, + ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + ReactiveMessagingKafkaConfig runtimeConfig, CombinedIndexBuildItem combinedIndex, BuildProducer defaultConfigProducer) { - if (!buildTimeConfig.serializerAutodetectionEnabled) { - return; - } - DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex()); + if (buildTimeConfig.serializerAutodetectionEnabled) { + discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + } - discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + if (launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) { + if (!runtimeConfig.enableGracefulShutdownInDevMode) { + for (AnnotationInstance annotation : discoveryState.findAnnotationsOnMethods(DotNames.INCOMING)) { + String channelName = annotation.value().asString(); + if (!discoveryState.isKafkaConnector(true, channelName)) { + continue; + } + defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem( + "mp.messaging.incoming." + channelName + ".graceful-shutdown", "false")); + } + } + } } // visible for testing diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java new file mode 100644 index 00000000000000..504dfea693f29f --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java @@ -0,0 +1,19 @@ +package io.quarkus.smallrye.reactivemessaging.kafka; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging.kafka") +public class ReactiveMessagingKafkaConfig { + + /** + * Enables the graceful shutdown in dev mode. + * The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka. + * While this setting is highly recommended in production, in dev mode, it's disabled by default. + * This setting allows to re-enable it. + */ + @ConfigProperty(defaultValue = "false") + public boolean enableGracefulShutdownInDevMode; + +}