Skip to content

Commit

Permalink
Merge pull request #18945 from cescoffier/disable-kafka-graceful-shut…
Browse files Browse the repository at this point in the history
…down-in-dev-mode

Disable the Kafka Connector graceful shutdown in dev mode
  • Loading branch information
stuartwdouglas authored Jul 24, 2021
2 parents 140e746 + 22bba29 commit 0d70cba
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -22,9 +23,11 @@
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;

public class SmallRyeReactiveMessagingKafkaProcessor {
Expand All @@ -46,18 +49,41 @@ public void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
.build());
}

/**
* Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode.
*/
@BuildStep
public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
public void defaultChannelConfiguration(
LaunchModeBuildItem launchMode,
ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
ReactiveMessagingKafkaConfig runtimeConfig,
CombinedIndexBuildItem combinedIndex,
BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfigProducer) {

if (!buildTimeConfig.serializerAutodetectionEnabled) {
return;
}

DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex());
if (buildTimeConfig.serializerAutodetectionEnabled) {
discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
}

discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
if (launchMode.getLaunchMode().isDevOrTest()) {
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
List<AnnotationInstance> incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
annotations.addAll(channels);
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(true, channelName)) {
continue;
}
String key = "mp.messaging.incoming." + channelName + ".graceful-shutdown";
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
}
}
}
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.smallrye.reactivemessaging.kafka;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "reactive-messaging.kafka")
public class ReactiveMessagingKafkaConfig {

/**
* Enables the graceful shutdown in dev and test modes.
* The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka.
* While this setting is highly recommended in production, in dev and test modes, it's disabled by default.
* This setting allows to re-enable it.
*/
@ConfigProperty(defaultValue = "false")
public boolean enableGracefulShutdownInDevAndTestMode;

}

0 comments on commit 0d70cba

Please sign in to comment.