Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable the Kafka Connector graceful shutdown in dev mode #18945

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
Expand All @@ -22,9 +23,11 @@
import io.quarkus.deployment.annotations.Consume;
import io.quarkus.deployment.builditem.CombinedIndexBuildItem;
import io.quarkus.deployment.builditem.FeatureBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.smallrye.reactivemessaging.kafka.ReactiveMessagingKafkaConfig;
import io.vertx.kafka.client.consumer.impl.KafkaReadStreamImpl;

public class SmallRyeReactiveMessagingKafkaProcessor {
Expand All @@ -46,18 +49,41 @@ public void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
.build());
}

/**
* Handles the serializer/deserializer detection and whether or not the graceful shutdown should be used in dev mode.
*/
@BuildStep
public void defaultSerdeConfig(ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
public void defaultChannelConfiguration(
LaunchModeBuildItem launchMode,
ReactiveMessagingKafkaBuildTimeConfig buildTimeConfig,
ReactiveMessagingKafkaConfig runtimeConfig,
CombinedIndexBuildItem combinedIndex,
BuildProducer<RunTimeConfigurationDefaultBuildItem> defaultConfigProducer) {

if (!buildTimeConfig.serializerAutodetectionEnabled) {
return;
}

DefaultSerdeDiscoveryState discoveryState = new DefaultSerdeDiscoveryState(combinedIndex.getIndex());
if (buildTimeConfig.serializerAutodetectionEnabled) {
discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
}

discoverDefaultSerdeConfig(discoveryState, defaultConfigProducer);
if (launchMode.getLaunchMode().isDevOrTest()) {
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
List<AnnotationInstance> incomings = discoveryState.findAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
annotations.addAll(channels);
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
if (!discoveryState.isKafkaConnector(true, channelName)) {
continue;
}
String key = "mp.messaging.incoming." + channelName + ".graceful-shutdown";
discoveryState.ifNotYetConfigured(key, () -> {
defaultConfigProducer.produce(new RunTimeConfigurationDefaultBuildItem(key, "false"));
});
}
}
}
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.smallrye.reactivemessaging.kafka;

import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "reactive-messaging.kafka")
public class ReactiveMessagingKafkaConfig {

/**
* Enables the graceful shutdown in dev and test modes.
* The graceful shutdown waits until the inflight records have been processed and the offset committed to Kafka.
* While this setting is highly recommended in production, in dev and test modes, it's disabled by default.
* This setting allows to re-enable it.
*/
@ConfigProperty(defaultValue = "false")
public boolean enableGracefulShutdownInDevAndTestMode;

}