Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config(kafka): clean-up kafka serializer config #11303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions datahub-frontend/app/client/KafkaTrackingProducer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package client;

import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.ProducerConfiguration;
import com.typesafe.config.Config;
import config.ConfigurationProvider;
Expand Down Expand Up @@ -46,7 +47,7 @@ public KafkaTrackingProducer(

if (_isEnabled) {
_logger.debug("Analytics tracking is enabled");
_producer = createKafkaProducer(config, configurationProvider.getKafka().getProducer());
_producer = createKafkaProducer(config, configurationProvider.getKafka());

lifecycle.addStopHook(
() -> {
Expand All @@ -69,7 +70,8 @@ public void send(ProducerRecord<String, String> record) {
}

private static KafkaProducer createKafkaProducer(
Config config, ProducerConfiguration producerConfiguration) {
Config config, KafkaConfiguration kafkaConfiguration) {
final ProducerConfiguration producerConfiguration = kafkaConfiguration.getProducer();
final Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
props.put(
Expand All @@ -78,12 +80,9 @@ private static KafkaProducer createKafkaProducer(
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getString("analytics.kafka.bootstrap.server"));
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
// key: Actor urn.
// value: JSON object.
props.putAll(kafkaConfiguration.getSerde().getUsageEvent().getProducerProperties(null));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, producerConfiguration.getMaxRequestSize());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfiguration.getCompressionType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
Expand Down Expand Up @@ -98,7 +97,8 @@ public DataHubStartupStep dataHubStartupStep(
protected KafkaEventProducer duheKafkaEventProducer(
@Qualifier("configurationProvider") ConfigurationProvider provider,
KafkaProperties properties,
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) {
@Qualifier("duheSchemaRegistryConfig")
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig) {
KafkaConfiguration kafkaConfiguration = provider.getKafka();
Producer<String, IndexedRecord> producer =
new KafkaProducer<>(
Expand Down Expand Up @@ -130,8 +130,9 @@ protected KafkaEventProducer kafkaEventProducer(
@ConditionalOnProperty(
name = "kafka.schemaRegistry.type",
havingValue = InternalSchemaRegistryFactory.TYPE)
protected SchemaRegistryConfig schemaRegistryConfig(
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) {
protected KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig(
@Qualifier("duheSchemaRegistryConfig")
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig) {
return duheSchemaRegistryConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import static org.testng.Assert.assertNotNull;

import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.mxe.Topics;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringCont

@Autowired
@Named("schemaRegistryConfig")
private SchemaRegistryConfig schemaRegistryConfig;
private KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig;

@Test
public void testSystemUpdateInit() {
Expand All @@ -64,13 +64,17 @@ public void testSystemUpdateInit() {

@Test
public void testSystemUpdateKafkaProducerOverride() throws RestClientException, IOException {
assertEquals(schemaRegistryConfig.getDeserializer(), MockSystemUpdateDeserializer.class);
assertEquals(schemaRegistryConfig.getSerializer(), MockSystemUpdateSerializer.class);
assertEquals(
schemaRegistryConfig.getValue().getDeserializer(),
MockSystemUpdateDeserializer.class.getName());
assertEquals(
schemaRegistryConfig.getValue().getSerializer(),
MockSystemUpdateSerializer.class.getName());
assertEquals(kafkaEventProducer, duheKafkaEventProducer);
assertEquals(entityService.getProducer(), duheKafkaEventProducer);

MockSystemUpdateSerializer serializer = new MockSystemUpdateSerializer();
serializer.configure(schemaRegistryConfig.getProperties(), false);
serializer.configure(schemaRegistryConfig.getProperties(null), false);
SchemaRegistryClient registry = serializer.getSchemaRegistryClient();
assertEquals(
registry.getId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe

@Autowired
@Named("schemaRegistryConfig")
private SchemaRegistryConfig schemaRegistryConfig;
private KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig;

@Autowired
@Named("duheKafkaEventProducer")
Expand All @@ -66,8 +66,12 @@ public void testSystemUpdateNonBlockingInit() {
assertNotNull(systemUpdateNonBlocking);

// Expected system update configuration and producer
assertEquals(schemaRegistryConfig.getDeserializer(), MockSystemUpdateDeserializer.class);
assertEquals(schemaRegistryConfig.getSerializer(), MockSystemUpdateSerializer.class);
assertEquals(
schemaRegistryConfig.getValue().getDeserializer(),
MockSystemUpdateDeserializer.class.getName());
assertEquals(
schemaRegistryConfig.getValue().getSerializer(),
MockSystemUpdateSerializer.class.getName());
assertEquals(duheKafkaEventProducer, kafkaEventProducer);
assertEquals(entityService.getProducer(), duheKafkaEventProducer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
package com.linkedin.metadata.config.kafka;

import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.springframework.lang.Nullable;

@Data
public class KafkaConfiguration {
// Avoiding dependencies on other libraries (Spring/Kafka) for configuration
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String KEY_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.key.delegate.class";
public static final String VALUE_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.value.delegate.class";

private String bootstrapServers;

Expand All @@ -14,4 +30,103 @@ public class KafkaConfiguration {
private ProducerConfiguration producer;

private ConsumerConfiguration consumer;

private SerDeConfig serde;

@Data
public static class SerDeConfig {
private SerDeKeyValueConfig usageEvent;
private SerDeKeyValueConfig event;
}

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public static class SerDeKeyValueConfig {
private SerDeProperties key;
private SerDeProperties value;
@Nullable private Map<String, String> properties;

protected Map<String, String> getProperties() {
return getProperties(null);
}

public Map<String, String> getProperties(@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final HashMap<String, String> props =
new HashMap<>(properties != null ? properties : Map.of());
if (schemaRegistryConfig != null) {
props.putAll(schemaRegistryConfig.getProperties());
}
return props;
}

public Map<String, String> getProducerProperties(
@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final SerDeKeyValueConfig config =
schemaRegistryConfig == null ? this : withSchemaRegistryOverride(schemaRegistryConfig);
return Map.of(
KEY_SERIALIZER_CLASS_CONFIG, config.getKey().getSerializer(),
VALUE_SERIALIZER_CLASS_CONFIG, config.getValue().getSerializer());
}

public Map<String, String> getConsumerProperties(
@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final SerDeKeyValueConfig config =
schemaRegistryConfig == null ? this : withSchemaRegistryOverride(schemaRegistryConfig);

HashMap<String, String> consumerProperties =
new HashMap<>(
Map.of(
KEY_DESERIALIZER_CLASS_CONFIG, config.getKey().getDeserializer(),
VALUE_DESERIALIZER_CLASS_CONFIG, config.getValue().getDeserializer()));

if (config.getKey().getDelegateDeserializer() == null
&& config.getValue().getDelegateDeserializer() == null) {
return consumerProperties;
}

if (config.getKey().getDelegateDeserializer() != null) {
consumerProperties.put(
KEY_DESERIALIZER_DELEGATE_CLASS, config.getKey().getDelegateDeserializer());
}
if (config.getValue().getDelegateDeserializer() != null) {
consumerProperties.put(
VALUE_DESERIALIZER_DELEGATE_CLASS, config.getValue().getDelegateDeserializer());
}

return consumerProperties;
}

public SerDeKeyValueConfig withSchemaRegistryOverride(
@NonNull SerDeKeyValueConfig schemaRegistryConfig) {
// Handle case where delegation is used, but missing from schemaRegistryConfig
return schemaRegistryConfig.toBuilder()
.key(key.withSchemaRegistryOverride(schemaRegistryConfig.getKey()))
.value(value.withSchemaRegistryOverride(schemaRegistryConfig.getValue()))
.build();
}
}

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public static class SerDeProperties {
private String serializer;
private String deserializer;
@Nullable private String delegateDeserializer;

public SerDeProperties withSchemaRegistryOverride(
@NonNull SerDeProperties schemaRegistryOverride) {
if (delegateDeserializer != null
&& schemaRegistryOverride.getDelegateDeserializer() == null) {
return schemaRegistryOverride.toBuilder()
.delegateDeserializer(schemaRegistryOverride.getDeserializer())
.deserializer(deserializer)
.build();
}
return schemaRegistryOverride;
}
}
}
17 changes: 17 additions & 0 deletions metadata-service/configuration/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,23 @@ kafka:
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
serde:
usageEvent:
key:
serializer: ${KAFKA_SERDE_USAGE_EVENTS_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_USAGE_EVENTS_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value:
serializer: ${KAFKA_SERDE_USAGE_EVENTS_VALUE_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_USAGE_EVENTS_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
event:
key:
serializer: ${KAFKA_SERDE_EVENT_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_EVENT_KEY_DESERIALIZER:org.springframework.kafka.support.serializer.ErrorHandlingDeserializer}
delegateDeserializer: ${KAFKA_SERDE_EVENT_KEY_DELEGATE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value:
serializer: ${KAFKA_SERDE_EVENT_VALUE_SERIALIZER:io.confluent.kafka.serializers.KafkaAvroSerializer}
deserializer: ${KAFKA_SERDE_EVENT_VALUE_DESERIALIZER:org.springframework.kafka.support.serializer.ErrorHandlingDeserializer}
delegateDeserializer: ${KAFKA_SERDE_EVENT_VALUE_DELEGATE_DESERIALIZER:io.confluent.kafka.serializers.KafkaAvroDeserializer}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to add documentation to the Kafka section of the Datahub documentation referring to this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that these settings are for advanced users only and not something I'd recommend changing for a general user. If the end-user has a need to change the serializers in their environment, they should have a deep understanding of kafka.

producer:
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:30000}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.gms.factory.kafka;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -24,14 +23,15 @@ public class DataHubKafkaProducerFactory {
protected Producer<String, IndexedRecord> createInstance(
@Qualifier("configurationProvider") ConfigurationProvider provider,
final KafkaProperties properties,
@Qualifier("schemaRegistryConfig") final SchemaRegistryConfig schemaRegistryConfig) {
@Qualifier("schemaRegistryConfig")
final KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig) {
KafkaConfiguration kafkaConfiguration = provider.getKafka();
return new KafkaProducer<>(
buildProducerProperties(schemaRegistryConfig, kafkaConfiguration, properties));
}

public static Map<String, Object> buildProducerProperties(
SchemaRegistryConfig schemaRegistryConfig,
KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig,
KafkaConfiguration kafkaConfiguration,
KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
Expand All @@ -45,8 +45,8 @@ public static Map<String, Object> buildProducerProperties(
} // else we rely on KafkaProperties which defaults to localhost:9092

Map<String, Object> props = properties.buildProducerProperties(null);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer());
props.putAll(
kafkaConfiguration.getSerde().getEvent().getProducerProperties(schemaRegistryConfig));

props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfiguration.getProducer().getRetryCount());
props.put(
Expand All @@ -66,9 +66,7 @@ public static Map<String, Object> buildProducerProperties(
kafkaConfiguration.getProducer().getMaxRequestSize());

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet().stream()
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
props.putAll(kafkaConfiguration.getSerde().getEvent().getProperties(schemaRegistryConfig));

return props;
}
Expand Down
Loading
Loading