Skip to content

Commit

Permalink
config(kafka): clean-up kafka serializer config (#11303)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Sep 5, 2024
1 parent 0788347 commit d723116
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 75 deletions.
15 changes: 7 additions & 8 deletions datahub-frontend/app/client/KafkaTrackingProducer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package client;

import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.config.kafka.ProducerConfiguration;
import com.typesafe.config.Config;
import config.ConfigurationProvider;
Expand Down Expand Up @@ -46,7 +47,7 @@ public KafkaTrackingProducer(

if (_isEnabled) {
_logger.debug("Analytics tracking is enabled");
_producer = createKafkaProducer(config, configurationProvider.getKafka().getProducer());
_producer = createKafkaProducer(config, configurationProvider.getKafka());

lifecycle.addStopHook(
() -> {
Expand All @@ -69,7 +70,8 @@ public void send(ProducerRecord<String, String> record) {
}

private static KafkaProducer createKafkaProducer(
Config config, ProducerConfiguration producerConfiguration) {
Config config, KafkaConfiguration kafkaConfiguration) {
final ProducerConfiguration producerConfiguration = kafkaConfiguration.getProducer();
final Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "datahub-frontend");
props.put(
Expand All @@ -78,12 +80,9 @@ private static KafkaProducer createKafkaProducer(
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
config.getString("analytics.kafka.bootstrap.server"));
props.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); // Actor urn.
props.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer"); // JSON object.
// key: Actor urn.
// value: JSON object.
props.putAll(kafkaConfiguration.getSerde().getUsageEvent().getProducerProperties(null));
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, producerConfiguration.getMaxRequestSize());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, producerConfiguration.getCompressionType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
import com.linkedin.gms.factory.kafka.common.TopicConventionFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.gms.factory.search.BaseElasticSearchComponentsFactory;
import com.linkedin.metadata.aspect.GraphRetriever;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
Expand Down Expand Up @@ -98,7 +97,8 @@ public DataHubStartupStep dataHubStartupStep(
protected KafkaEventProducer duheKafkaEventProducer(
@Qualifier("configurationProvider") ConfigurationProvider provider,
KafkaProperties properties,
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) {
@Qualifier("duheSchemaRegistryConfig")
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig) {
KafkaConfiguration kafkaConfiguration = provider.getKafka();
Producer<String, IndexedRecord> producer =
new KafkaProducer<>(
Expand Down Expand Up @@ -130,8 +130,9 @@ protected KafkaEventProducer kafkaEventProducer(
@ConditionalOnProperty(
name = "kafka.schemaRegistry.type",
havingValue = InternalSchemaRegistryFactory.TYPE)
protected SchemaRegistryConfig schemaRegistryConfig(
@Qualifier("duheSchemaRegistryConfig") SchemaRegistryConfig duheSchemaRegistryConfig) {
protected KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig(
@Qualifier("duheSchemaRegistryConfig")
KafkaConfiguration.SerDeKeyValueConfig duheSchemaRegistryConfig) {
return duheSchemaRegistryConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import static org.testng.Assert.assertNotNull;

import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.mxe.Topics;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringCont

@Autowired
@Named("schemaRegistryConfig")
private SchemaRegistryConfig schemaRegistryConfig;
private KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig;

@Test
public void testSystemUpdateInit() {
Expand All @@ -64,13 +64,17 @@ public void testSystemUpdateInit() {

@Test
public void testSystemUpdateKafkaProducerOverride() throws RestClientException, IOException {
assertEquals(schemaRegistryConfig.getDeserializer(), MockSystemUpdateDeserializer.class);
assertEquals(schemaRegistryConfig.getSerializer(), MockSystemUpdateSerializer.class);
assertEquals(
schemaRegistryConfig.getValue().getDeserializer(),
MockSystemUpdateDeserializer.class.getName());
assertEquals(
schemaRegistryConfig.getValue().getSerializer(),
MockSystemUpdateSerializer.class.getName());
assertEquals(kafkaEventProducer, duheKafkaEventProducer);
assertEquals(entityService.getProducer(), duheKafkaEventProducer);

MockSystemUpdateSerializer serializer = new MockSystemUpdateSerializer();
serializer.configure(schemaRegistryConfig.getProperties(), false);
serializer.configure(schemaRegistryConfig.getProperties(null), false);
SchemaRegistryClient registry = serializer.getSchemaRegistryClient();
assertEquals(
registry.getId(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -47,7 +47,7 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe

@Autowired
@Named("schemaRegistryConfig")
private SchemaRegistryConfig schemaRegistryConfig;
private KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig;

@Autowired
@Named("duheKafkaEventProducer")
Expand All @@ -66,8 +66,12 @@ public void testSystemUpdateNonBlockingInit() {
assertNotNull(systemUpdateNonBlocking);

// Expected system update configuration and producer
assertEquals(schemaRegistryConfig.getDeserializer(), MockSystemUpdateDeserializer.class);
assertEquals(schemaRegistryConfig.getSerializer(), MockSystemUpdateSerializer.class);
assertEquals(
schemaRegistryConfig.getValue().getDeserializer(),
MockSystemUpdateDeserializer.class.getName());
assertEquals(
schemaRegistryConfig.getValue().getSerializer(),
MockSystemUpdateSerializer.class.getName());
assertEquals(duheKafkaEventProducer, kafkaEventProducer);
assertEquals(entityService.getProducer(), duheKafkaEventProducer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
package com.linkedin.metadata.config.kafka;

import java.util.HashMap;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.springframework.lang.Nullable;

@Data
public class KafkaConfiguration {
// Avoiding dependencies on other libraries (Spring/Kafka) for configuration
public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer";
public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer";
public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
public static final String KEY_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.key.delegate.class";
public static final String VALUE_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.value.delegate.class";

private String bootstrapServers;

Expand All @@ -14,4 +30,103 @@ public class KafkaConfiguration {
private ProducerConfiguration producer;

private ConsumerConfiguration consumer;

private SerDeConfig serde;

@Data
public static class SerDeConfig {
private SerDeKeyValueConfig usageEvent;
private SerDeKeyValueConfig event;
}

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public static class SerDeKeyValueConfig {
private SerDeProperties key;
private SerDeProperties value;
@Nullable private Map<String, String> properties;

protected Map<String, String> getProperties() {
return getProperties(null);
}

public Map<String, String> getProperties(@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final HashMap<String, String> props =
new HashMap<>(properties != null ? properties : Map.of());
if (schemaRegistryConfig != null) {
props.putAll(schemaRegistryConfig.getProperties());
}
return props;
}

public Map<String, String> getProducerProperties(
@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final SerDeKeyValueConfig config =
schemaRegistryConfig == null ? this : withSchemaRegistryOverride(schemaRegistryConfig);
return Map.of(
KEY_SERIALIZER_CLASS_CONFIG, config.getKey().getSerializer(),
VALUE_SERIALIZER_CLASS_CONFIG, config.getValue().getSerializer());
}

public Map<String, String> getConsumerProperties(
@Nullable SerDeKeyValueConfig schemaRegistryConfig) {
final SerDeKeyValueConfig config =
schemaRegistryConfig == null ? this : withSchemaRegistryOverride(schemaRegistryConfig);

HashMap<String, String> consumerProperties =
new HashMap<>(
Map.of(
KEY_DESERIALIZER_CLASS_CONFIG, config.getKey().getDeserializer(),
VALUE_DESERIALIZER_CLASS_CONFIG, config.getValue().getDeserializer()));

if (config.getKey().getDelegateDeserializer() == null
&& config.getValue().getDelegateDeserializer() == null) {
return consumerProperties;
}

if (config.getKey().getDelegateDeserializer() != null) {
consumerProperties.put(
KEY_DESERIALIZER_DELEGATE_CLASS, config.getKey().getDelegateDeserializer());
}
if (config.getValue().getDelegateDeserializer() != null) {
consumerProperties.put(
VALUE_DESERIALIZER_DELEGATE_CLASS, config.getValue().getDelegateDeserializer());
}

return consumerProperties;
}

public SerDeKeyValueConfig withSchemaRegistryOverride(
@NonNull SerDeKeyValueConfig schemaRegistryConfig) {
// Handle case where delegation is used, but missing from schemaRegistryConfig
return schemaRegistryConfig.toBuilder()
.key(key.withSchemaRegistryOverride(schemaRegistryConfig.getKey()))
.value(value.withSchemaRegistryOverride(schemaRegistryConfig.getValue()))
.build();
}
}

@Data
@Builder(toBuilder = true)
@AllArgsConstructor
@NoArgsConstructor
public static class SerDeProperties {
private String serializer;
private String deserializer;
@Nullable private String delegateDeserializer;

public SerDeProperties withSchemaRegistryOverride(
@NonNull SerDeProperties schemaRegistryOverride) {
if (delegateDeserializer != null
&& schemaRegistryOverride.getDelegateDeserializer() == null) {
return schemaRegistryOverride.toBuilder()
.delegateDeserializer(schemaRegistryOverride.getDeserializer())
.deserializer(deserializer)
.build();
}
return schemaRegistryOverride;
}
}
}
17 changes: 17 additions & 0 deletions metadata-service/configuration/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,23 @@ kafka:
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
serde:
usageEvent:
key:
serializer: ${KAFKA_SERDE_USAGE_EVENTS_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_USAGE_EVENTS_KEY_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value:
serializer: ${KAFKA_SERDE_USAGE_EVENTS_VALUE_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_USAGE_EVENTS_VALUE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
event:
key:
serializer: ${KAFKA_SERDE_EVENT_KEY_SERIALIZER:org.apache.kafka.common.serialization.StringSerializer}
deserializer: ${KAFKA_SERDE_EVENT_KEY_DESERIALIZER:org.springframework.kafka.support.serializer.ErrorHandlingDeserializer}
delegateDeserializer: ${KAFKA_SERDE_EVENT_KEY_DELEGATE_DESERIALIZER:org.apache.kafka.common.serialization.StringDeserializer}
value:
serializer: ${KAFKA_SERDE_EVENT_VALUE_SERIALIZER:io.confluent.kafka.serializers.KafkaAvroSerializer}
deserializer: ${KAFKA_SERDE_EVENT_VALUE_DESERIALIZER:org.springframework.kafka.support.serializer.ErrorHandlingDeserializer}
delegateDeserializer: ${KAFKA_SERDE_EVENT_VALUE_DELEGATE_DESERIALIZER:io.confluent.kafka.serializers.KafkaAvroDeserializer}
producer:
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:30000}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.gms.factory.kafka;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -24,14 +23,15 @@ public class DataHubKafkaProducerFactory {
protected Producer<String, IndexedRecord> createInstance(
@Qualifier("configurationProvider") ConfigurationProvider provider,
final KafkaProperties properties,
@Qualifier("schemaRegistryConfig") final SchemaRegistryConfig schemaRegistryConfig) {
@Qualifier("schemaRegistryConfig")
final KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig) {
KafkaConfiguration kafkaConfiguration = provider.getKafka();
return new KafkaProducer<>(
buildProducerProperties(schemaRegistryConfig, kafkaConfiguration, properties));
}

public static Map<String, Object> buildProducerProperties(
SchemaRegistryConfig schemaRegistryConfig,
KafkaConfiguration.SerDeKeyValueConfig schemaRegistryConfig,
KafkaConfiguration kafkaConfiguration,
KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
Expand All @@ -45,8 +45,8 @@ public static Map<String, Object> buildProducerProperties(
} // else we rely on KafkaProperties which defaults to localhost:9092

Map<String, Object> props = properties.buildProducerProperties(null);

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer());
props.putAll(
kafkaConfiguration.getSerde().getEvent().getProducerProperties(schemaRegistryConfig));

props.put(ProducerConfig.RETRIES_CONFIG, kafkaConfiguration.getProducer().getRetryCount());
props.put(
Expand All @@ -66,9 +66,7 @@ public static Map<String, Object> buildProducerProperties(
kafkaConfiguration.getProducer().getMaxRequestSize());

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet().stream()
.filter(entry -> entry.getValue() != null && !entry.getValue().toString().isEmpty())
.forEach(entry -> props.put(entry.getKey(), entry.getValue()));
props.putAll(kafkaConfiguration.getSerde().getEvent().getProperties(schemaRegistryConfig));

return props;
}
Expand Down
Loading

0 comments on commit d723116

Please sign in to comment.