Skip to content

Commit

Permalink
Merge pull request #24356 from Ladicek/kafka-autodetection-improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Mar 16, 2022
2 parents ea3688c + 2311124 commit 5d31bc5
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
Expand All @@ -26,6 +27,11 @@ class DefaultSerdeDiscoveryState {
private final Map<String, Boolean> isKafkaConnector = new HashMap<>();
private final Set<String> alreadyConfigured = new HashSet<>();

private Boolean connectorHasKeySerializer;
private Boolean connectorHasValueSerializer;
private Boolean connectorHasKeyDeserializer;
private Boolean connectorHasValueDeserializer;

private Boolean hasConfluent;
private Boolean hasApicurio1;
private Boolean hasApicurio2;
Expand All @@ -35,6 +41,10 @@ class DefaultSerdeDiscoveryState {
this.index = index;
}

Config getConfig() {
return ConfigProvider.getConfig();
}

boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> channelsManagedByConnectors, boolean incoming,
String channelName) {
// First look in the channelsManagedByConnectors list
Expand All @@ -48,13 +58,61 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> channelsManagedB
String channelType = incoming ? "incoming" : "outgoing";
return isKafkaConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> {
String connectorKey = "mp.messaging." + channelType + "." + channelName + ".connector";
String connector = ConfigProvider.getConfig()
String connector = getConfig()
.getOptionalValue(connectorKey, String.class)
.orElse("ignored");
return KafkaConnector.CONNECTOR_NAME.equals(connector);
});
}

boolean shouldNotConfigure(String key) {
// if we know at build time that key/value [de]serializer is configured on the connector,
// we should NOT emit default configuration for key/value [de]serializer on the channel
// (in other words, only a user can explicitly override a connector configuration)
//
// more config properties could possibly be handled in the same way, but these should suffice for now

if (key.startsWith("mp.messaging.outgoing.") && key.endsWith(".key.serializer")) {
if (connectorHasKeySerializer == null) {
connectorHasKeySerializer = getConfig()
.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + ".key.serializer",
String.class)
.isPresent();
}
return connectorHasKeySerializer;
}
if (key.startsWith("mp.messaging.outgoing.") && key.endsWith(".value.serializer")) {
if (connectorHasValueSerializer == null) {
connectorHasValueSerializer = getConfig()
.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + ".value.serializer",
String.class)
.isPresent();
}
return connectorHasValueSerializer;
}

if (key.startsWith("mp.messaging.incoming.") && key.endsWith(".key.deserializer")) {
if (connectorHasKeyDeserializer == null) {
connectorHasKeyDeserializer = getConfig()
.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + ".key.deserializer",
String.class)
.isPresent();
}
return connectorHasKeyDeserializer;
}
if (key.startsWith("mp.messaging.incoming.") && key.endsWith(".value.deserializer")) {
if (connectorHasValueDeserializer == null) {
connectorHasValueDeserializer = getConfig()
.getOptionalValue("mp.messaging.connector." + KafkaConnector.CONNECTOR_NAME + ".value.deserializer",
String.class)
.isPresent();
}
return connectorHasValueDeserializer;
}

return false;
}

void ifNotYetConfigured(String key, Runnable runnable) {
if (!alreadyConfigured.contains(key)) {
alreadyConfigured.add(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ private void produceRuntimeConfigurationDefaultBuildItem(DefaultSerdeDiscoverySt
return;
}

if (discovery.shouldNotConfigure(key)) {
return;
}

discovery.ifNotYetConfigured(key, () -> {
config.produce(new RunTimeConfigurationDefaultBuildItem(key, value));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;

import javax.inject.Inject;
Expand All @@ -19,6 +20,7 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.assertj.core.groups.Tuple;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
Expand All @@ -38,6 +40,8 @@
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
import io.smallrye.config.SmallRyeConfigBuilder;
import io.smallrye.config.common.MapBackedConfigSource;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MutinyEmitter;
Expand All @@ -48,9 +52,18 @@

public class DefaultSerdeConfigTest {
private static void doTest(Tuple[] expectations, Class<?>... classesToIndex) {
doTest(null, expectations, classesToIndex);
}

private static void doTest(Config customConfig, Tuple[] expectations, Class<?>... classesToIndex) {
List<RunTimeConfigurationDefaultBuildItem> configs = new ArrayList<>();

DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classesToIndex)) {
@Override
Config getConfig() {
return customConfig != null ? customConfig : super.getConfig();
}

@Override
boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean incoming, String channelName) {
return true;
Expand Down Expand Up @@ -2467,4 +2480,69 @@ void channel10(List<Message<JacksonDto>> jacksonDto) {

}

// ---

@Test
public void connectorConfigNotOverriden() {
// @formatter:off
Tuple[] expectations1 = {
// "mp.messaging.outgoing.channel1.key.serializer" NOT expected, connector config exists
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),

tuple("mp.messaging.incoming.channel2.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
// "mp.messaging.incoming.channel2.value.deserializer" NOT expected, connector config exists

tuple("mp.messaging.incoming.channel3.key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"),
// "mp.messaging.incoming.channel3.value.deserializer" NOT expected, connector config exists
// "mp.messaging.outgoing.channel4.key.serializer" NOT expected, connector config exists
tuple("mp.messaging.outgoing.channel4.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
};

Tuple[] expectations2 = {
tuple("mp.messaging.outgoing.channel1.key.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
// "mp.messaging.outgoing.channel1.value.serializer" NOT expected, connector config exists

// "mp.messaging.incoming.channel2.key.deserializer" NOT expected, connector config exists
tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),

// "mp.messaging.incoming.channel3.key.deserializer" NOT expected, connector config exists
tuple("mp.messaging.incoming.channel3.value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"),
tuple("mp.messaging.outgoing.channel4.key.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
// "mp.messaging.outgoing.channel4.value.serializer" NOT expected, connector config exists
};
// @formatter:on

doTest(new SmallRyeConfigBuilder()
.withSources(new MapBackedConfigSource("test", Map.of(
"mp.messaging.connector.smallrye-kafka.key.serializer", "foo.Bar",
"mp.messaging.connector.smallrye-kafka.value.deserializer", "foo.Baz")) {
})
.build(), expectations1, ConnectorConfigNotOverriden.class);

doTest(new SmallRyeConfigBuilder()
.withSources(new MapBackedConfigSource("test", Map.of(
"mp.messaging.connector.smallrye-kafka.key.deserializer", "foo.Bar",
"mp.messaging.connector.smallrye-kafka.value.serializer", "foo.Baz")) {
})
.build(), expectations2, ConnectorConfigNotOverriden.class);
}

private static class ConnectorConfigNotOverriden {
@Outgoing("channel1")
Record<Long, String> method1() {
return null;
}

@Incoming("channel2")
CompletionStage<?> method2(Record<Long, String> msg) {
return null;
}

@Incoming("channel3")
@Outgoing("channel4")
Record<Long, String> method3(Record<Long, String> msg) {
return null;
}
}

}

0 comments on commit 5d31bc5

Please sign in to comment.