Skip to content

Commit

Permalink
Disable the graceful shutdown in dev mode.
Browse files Browse the repository at this point in the history
Also, propose an option to re-enable it.
The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka.
While this setting is highly recommended in production, in dev mode, it introduces slowness on shutdown.
  • Loading branch information
cescoffier committed Jul 22, 2021
1 parent cf73c98 commit e3880b0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;

public class SmallRyeReactiveMessagingKafkaProcessor {
Expand All @@ -46,18 +49,34 @@ public void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
.build());
}

/**
* Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode.
*/
@BuildStep
public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
public void defaultChannelConfiguration(
LaunchModeBuildItem launchMode,
ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
ReactiveMessagingKafkaConfig runtimeConfig,
CombinedIndexBuildItem combinedIndex,
BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfigProducer) {

if (!buildTimeConfig.serializerAutodetectionEnabled) {
return;
}

DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex());
if (buildTimeConfig.serializerAutodetectionEnabled) {
discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
}

discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
if (launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) {
if (!runtimeConfig.enableGracefulShutdownInDevMode) {
for (AnnotationInstance annotation : discoveryState.findAnnotationsOnMethods(DotNames.INCOMING)) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(true, channelName)) {
continue;
}
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(
"mp.messaging.incoming." + channelName + ".graceful-shutdown", "false"));
}
}
}
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.smallrye.reactivemessaging.kafka;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "reactive-messaging.kafka")
public class ReactiveMessagingKafkaConfig {

/**
* Enables the graceful shutdown in dev mode.
* The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka.
* While this setting is highly recommended in production, in dev mode, it's disabled by default.
* This setting allows to re-enable it.
*/
@ConfigProperty(defaultValue = "false")
public boolean enableGracefulShutdownInDevMode;

}

0 comments on commit e3880b0

Please sign in to comment.