Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka serde discovery generates serializer for dead letter queue failure strategy #36347

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ static boolean hasStateStoreConfig(String stateStoreName, Config config) {
return stateStores.contains(stateStoreName);
}

static boolean hasDLQConfig(String channelName, Config config) {
String propertyKey = getChannelPropertyKey(channelName, "failure-strategy", true);
Optional<String> channelFailureStrategy = config.getOptionalValue(propertyKey, String.class);
Optional<String> failureStrategy = channelFailureStrategy.or(() -> getConnectorProperty("failure-strategy", config));

return failureStrategy.isPresent()
&& (failureStrategy.get().equals("dead-letter-queue")
|| failureStrategy.get().equals("delayed-retry-topic"));
}

private static Optional<String> getConnectorProperty(String keySuffix, Config config) {
return config.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + "." + keySuffix,
String.class);
Expand Down Expand Up @@ -207,8 +217,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection) {
Map<String, String> alreadyGeneratedSerializers = new HashMap<>();
Map<String, String> alreadyGeneratedDeserializers = new HashMap<>();
Map<String, Result> alreadyGeneratedSerializers = new HashMap<>();
Map<String, Result> alreadyGeneratedDeserializers = new HashMap<>();
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.INCOMING)) {
String channelName = annotation.value().asString();
if (!discovery.isKafkaConnector(channelsManagedByConnectors, true, channelName)) {
Expand All @@ -220,7 +230,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type incomingType = getIncomingTypeFromMethod(method);

processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
}

for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
Expand Down Expand Up @@ -257,7 +267,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
Type incomingType = getIncomingTypeFromChannelInjectionPoint(injectionPointType);

processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

processKafkaTransactions(discovery, config, channelName, injectionPointType);

Expand Down Expand Up @@ -293,11 +303,12 @@ private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
private void processIncomingType(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, Type incomingType, String channelName,
BuildProducer<GeneratedClassBuildItem> generatedClass, BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedDeserializers) {
Map<String, Result> alreadyGeneratedDeserializers, Map<String, Result> alreadyGeneratedSerializers) {
extractKeyValueType(incomingType, (key, value, isBatchType) -> {
Result keyDeserializer = deserializerFor(discovery, key, generatedClass, reflection, alreadyGeneratedDeserializers);
Result valueDeserializer = deserializerFor(discovery, value, generatedClass, reflection,
alreadyGeneratedDeserializers);
Result keyDeserializer = deserializerFor(discovery, key, true, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);
Result valueDeserializer = deserializerFor(discovery, value, false, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers, alreadyGeneratedSerializers);

produceRuntimeConfigurationDefaultBuildItem(discovery, config,
getChannelPropertyKey(channelName, "key.deserializer", true), keyDeserializer);
Expand Down Expand Up @@ -494,7 +505,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {

private void processOutgoingType(DefaultSerdeDiscoveryState discovery, Type outgoingType,
BiConsumer<Result, Result> serializerAcceptor, BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, String> alreadyGeneratedSerializer) {
BuildProducer<ReflectiveClassBuildItem> reflection, Map<String, Result> alreadyGeneratedSerializer) {
extractKeyValueType(outgoingType, (key, value, isBatch) -> {
Result keySerializer = serializerFor(discovery, key, generatedClass, reflection,
alreadyGeneratedSerializer);
Expand Down Expand Up @@ -766,10 +777,14 @@ private static boolean isRawMessage(Type type) {
);
// @formatter:on

private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
private Result deserializerFor(DefaultSerdeDiscoveryState discovery,
Type type,
boolean key,
String channelName,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedSerializers) {
Map<String, Result> alreadyGeneratedDeserializers,
Map<String, Result> alreadyGeneratedSerializers) {
Result result = serializerDeserializerFor(discovery, type, false);
if (result != null && !result.exists) {
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
Expand All @@ -779,24 +794,34 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
// also, only generate the serializer/deserializer for classes and only generate once
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
// Check if already generated
String clazz = alreadyGeneratedSerializers.get(type.toString());
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
result = alreadyGeneratedDeserializers.get(type.toString());
if (result == null) {
String clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString());
// Deserializers are access by reflection.
reflection.produce(
ReflectiveClassBuildItem.builder(clazz).methods().build());
alreadyGeneratedSerializers.put(type.toString(), clazz);
alreadyGeneratedDeserializers.put(type.toString(), result);
// if the channel has a DLQ config generate a serializer as well
if (hasDLQConfig(channelName, discovery.getConfig())) {
Result serializer = serializerFor(discovery, type, generatedClass, reflection, alreadyGeneratedSerializers);
if (serializer != null) {
result = Result.of(clazz)
.with(key, "dead-letter-queue.key.serializer", serializer.value)
.with(!key, "dead-letter-queue.value.serializer", serializer.value);
}
} else {
result = Result.of(clazz);
}
}
result = Result.of(clazz);
}
return result;
}

private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
BuildProducer<GeneratedClassBuildItem> generatedClass,
BuildProducer<ReflectiveClassBuildItem> reflection,
Map<String, String> alreadyGeneratedSerializers) {
Map<String, Result> alreadyGeneratedSerializers) {
Result result = serializerDeserializerFor(discovery, type, true);
if (result != null && !result.exists) {
// avoid returning Result.nonexistent() to callers, they expect a non-null Result to always be known
Expand All @@ -806,16 +831,16 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
// also, only generate the serializer/deserializer for classes and only generate once
if (result == null && type != null && generatedClass != null && type.kind() == Type.Kind.CLASS) {
// Check if already generated
String clazz = alreadyGeneratedSerializers.get(type.toString());
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
result = alreadyGeneratedSerializers.get(type.toString());
if (result == null) {
String clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString());
// Serializers are access by reflection.
reflection.produce(
ReflectiveClassBuildItem.builder(clazz).methods().build());
alreadyGeneratedSerializers.put(type.toString(), clazz);
result = Result.of(clazz);
alreadyGeneratedSerializers.put(type.toString(), result);
}
result = Result.of(clazz);
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import jakarta.inject.Inject;

Expand All @@ -22,6 +23,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.assertj.core.api.Assert;
import org.assertj.core.groups.Tuple;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.spi.ConfigProviderResolver;
Expand All @@ -40,7 +42,9 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
Expand All @@ -63,7 +67,16 @@ private static void doTest(Tuple[] expectations, Class<?>... classesToIndex) {
}

private static void doTest(Config customConfig, Tuple[] expectations, Class<?>... classesToIndex) {
doTest(customConfig, expectations, Collections.emptyList(), Collections.emptyList(), classesToIndex);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private static void doTest(Config customConfig, Tuple[] expectations,
List<Function<String, Assert>> generatedNames,
List<Function<String, Assert>> reflectiveNames, Class<?>... classesToIndex) {
List<RunTimeConfigurationDefaultBuildItem> configs = new ArrayList<>();
List<GeneratedClassBuildItem> generated = new ArrayList<>();
List<ReflectiveClassBuildItem> reflective = new ArrayList<>();

List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
classes.add(Incoming.class);
Expand All @@ -81,11 +94,35 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
};
try {
new SmallRyeReactiveMessagingKafkaProcessor().discoverDefaultSerdeConfig(discovery, Collections.emptyList(),
configs::add, null, null);
configs::add,
(generatedNames == null) ? null : generated::add,
(reflectiveNames == null) ? null : reflective::add);

assertThat(configs)
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
.containsExactlyInAnyOrder(expectations);
.allSatisfy(tuple -> {
Object[] e = tuple.toArray();
String key = (String) e[0];
String value = (String) e[1];
assertThat(Arrays.stream(expectations).filter(t -> key.equals(t.toArray()[0])))
.hasSize(1)
.satisfiesOnlyOnce(t -> {
Object o = t.toArray()[1];
if (o instanceof String) {
assertThat(value).isEqualTo((String) o);
} else {
((Function<String, Assert>) o).apply(value);
}
});
});

assertThat(generated)
.extracting(GeneratedClassBuildItem::getName)
.allSatisfy(s -> assertThat(generatedNames).satisfiesOnlyOnce(c -> c.apply(s)));

assertThat(reflective)
.flatExtracting(ReflectiveClassBuildItem::getClassNames)
.allSatisfy(s -> assertThat(reflectiveNames).satisfiesOnlyOnce(c -> c.apply(s)));
} finally {
// must not leak the Config instance associated to the system classloader
if (customConfig == null) {
Expand All @@ -94,6 +131,14 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in
}
}

Function<String, Assert> assertMatches(String regex) {
return s -> assertThat(s).matches(regex);
}

Function<String, Assert> assertStartsWith(String starts) {
return s -> assertThat(s).startsWith(starts);
}

private static IndexView index(List<Class<?>> classes) {
Indexer indexer = new Indexer();
for (Class<?> clazz : classes) {
Expand Down Expand Up @@ -2570,11 +2615,14 @@ public void genericSerdeImplementationAutoDetect() {

Tuple[] expectations1 = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),

tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
tuple("mp.messaging.incoming.channel3.value.deserializer", assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.kafka.client.serialization.JsonObjectDeserializer"),
};

var generated1 = List.of(assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"));
var reflective1 = List.of(assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"));

Tuple[] expectations2 = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MySerializer"),

Expand Down Expand Up @@ -2602,7 +2650,7 @@ public void genericSerdeImplementationAutoDetect() {
};
// @formatter:on

doTest(expectations1, CustomSerdeImplementation.class, CustomDto.class);
doTest(null, expectations1, generated1, reflective1, CustomSerdeImplementation.class, CustomDto.class);

doTest(expectations2, CustomSerdeImplementation.class, CustomDto.class,
MySerializer.class,
Expand Down Expand Up @@ -2795,5 +2843,51 @@ void method1(KafkaRecord<Integer, String> msg) {

}

@Test
void deadLetterQueue() {
Tuple[] expectations = {
tuple("mp.messaging.incoming.channel1.value.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel1.dead-letter-queue.value.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
tuple("mp.messaging.incoming.channel2.key.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.value.deserializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_")),
tuple("mp.messaging.incoming.channel2.dead-letter-queue.key.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
tuple("mp.messaging.incoming.channel2.dead-letter-queue.value.serializer",
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")),
};
var generated = List.of(
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Deserializer_"),
assertStartsWith("io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest$CustomDto_Serializer_")
);
var reflective = List.of(
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Deserializer_"),
assertStartsWith("io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$CustomDto_Serializer_")
);
doTest(new SmallRyeConfigBuilder()
.withSources(new MapBackedConfigSource("test", Map.of(
"mp.messaging.incoming.channel1.failure-strategy", "dead-letter-queue",
"mp.messaging.incoming.channel2.failure-strategy", "delayed-retry-topic")) {
})
.build(), expectations, generated, reflective, DeadLetterQueue.class);
}

private static class DeadLetterQueue {

@Incoming("channel1")
void method1(CustomDto msg) {

}

@Incoming("channel2")
void method2(Record<CustomDto, CustomDto> msg) {

}

}


}