diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 012dcf505a531..836fdb584a9de 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -1,5 +1,6 @@ package io.quarkus.smallrye.reactivemessaging.kafka.deployment; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; @@ -22,9 +23,11 @@ import io.quarkus.deployment.annotations.Consume; import io.quarkus.deployment.builditem.CombinedIndexBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig; import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl; public class SmallRyeReactiveMessagingKafkaProcessor { @@ -46,18 +49,41 @@ public void build(BuildProducer reflectiveClass) { .build()); } + /** + * Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode. + */ @BuildStep - public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + public void defaultChannelConfiguration( + LaunchModeBuildItem launchMode, + ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig, + ReactiveMessagingKafkaConfig runtimeConfig, CombinedIndexBuildItem combinedIndex, BuildProducer defaultConfigProducer) { - if (!buildTimeConfig.serializerAutodetectionEnabled) { - return; - } - DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex()); + if (buildTimeConfig.serializerAutodetectionEnabled) { + discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + } - discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer); + if (launchMode.getLaunchMode().isDevOrTest()) { + if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) { + List incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING); + List channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); + List annotations = new ArrayList<>(); + annotations.addAll(incomings); + annotations.addAll(channels); + for (AnnotationInstance annotation : annotations) { + String channelName = annotation.value().asString(); + if (!discoveryState.isKafkaConnector(true, channelName)) { + continue; + } + String key = "mp.messaging.incoming." + channelName + ".graceful-shutdown"; + discoveryState.ifNotYetConfigured(key, () -> { + defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false")); + }); + } + } + } } // visible for testing diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java new file mode 100644 index 0000000000000..1e45d53a154ef --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/ReactiveMessagingKafkaConfig.java @@ -0,0 +1,19 @@ +package io.quarkus.smallrye.reactivemessaging.kafka; + +import org.eclipse.microprofile.config.inject.ConfigProperty; + +import io.quarkus.runtime.annotations.ConfigRoot; + +@ConfigRoot(name = "reactive-messaging.kafka") +public class ReactiveMessagingKafkaConfig { + + /** + * Enables the graceful shutdown in dev and test modes. + * The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka. + * While this setting is highly recommended in production, in dev and test modes, it's disabled by default. + * This setting allows to re-enable it. + */ + @ConfigProperty(defaultValue = "false") + public boolean enableGracefulShutdownInDevAndTestMode; + +}