Skip to content

Commit

Permalink
feat(kafka): expose default kafka producer mechanism (#6381)
Browse files Browse the repository at this point in the history
* Expose Kafka Sender Retry Parameters

* Implement KafkaHealthChecker

* feat(kafka): expose default kafka producer mechanism
  • Loading branch information
djordje-mijatovic authored Dec 20, 2022
1 parent b58021a commit e6c48e5
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,7 +28,6 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


/**
* <p>The topic names that this emits to can be controlled by constructing this with a {@link TopicConvention}.
* If none is given, defaults to a {@link TopicConventionImpl} with the default delimiter of an underscore (_).
Expand All @@ -38,32 +36,21 @@
public class KafkaEventProducer implements EventProducer {

private final Producer<String, ? extends IndexedRecord> _producer;
private final Optional<Callback> _callback;
private final TopicConvention _topicConvention;
private final KafkaHealthChecker _kafkaHealthChecker;

/**
* Constructor.
*
* @param producer The Kafka {@link Producer} to use
* @param topicConvention the convention to use to get kafka topic names
* @param kafkaHealthChecker The {@link Callback} to invoke when the request is completed
*/
public KafkaEventProducer(@Nonnull final Producer<String, ? extends IndexedRecord> producer,
@Nonnull final TopicConvention topicConvention) {
this(producer, topicConvention, null);
}

/**
* Constructor.
*
* @param producer The Kafka {@link Producer} to use
* @param topicConvention the convention to use to get kafka topic names
* @param callback The {@link Callback} to invoke when the request is completed
*/
public KafkaEventProducer(@Nonnull final Producer<String, ? extends IndexedRecord> producer,
@Nonnull final TopicConvention topicConvention, @Nullable final Callback callback) {
@Nonnull final TopicConvention topicConvention, @Nonnull final KafkaHealthChecker kafkaHealthChecker) {
_producer = producer;
_callback = Optional.ofNullable(callback);
_topicConvention = topicConvention;
_kafkaHealthChecker = kafkaHealthChecker;
}

@Override
Expand Down Expand Up @@ -93,30 +80,16 @@ public void produceMetadataAuditEvent(@Nonnull Urn urn, @Nullable Snapshot oldSn
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataAuditEvent: %s",
urn,
metadataAuditEvent.toString()));
metadataAuditEvent));
record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent.toString()), e);
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
_callback.get());
} else {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
(metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MAE for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MAE for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
String topic = _topicConvention.getMetadataAuditEventTopicName();
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MAE", urn.toString()));
}

@Override
Expand All @@ -127,33 +100,19 @@ public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeLog: %s",
urn,
metadataChangeLog.toString()));
metadataChangeLog));
record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog.toString()), e);
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}

String topic = _topicConvention.getMetadataChangeLogVersionedTopicName();
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MCL for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MCL for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
}

@Override
Expand All @@ -176,21 +135,8 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}

String topic = _topicConvention.getMetadataChangeProposalTopicName();
if (_callback.isPresent()) {
_producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
} else {
_producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MCP for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MCP for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
_producer.send(new ProducerRecord(topic, urn.toString(), record),
_kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
}

@Override
Expand All @@ -199,25 +145,16 @@ public void producePlatformEvent(@Nonnull String name, @Nullable String key, @No
try {
log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s",
name,
event.toString()));
event));
record = EventUtils.pegasusToAvroPE(event);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event.toString()), e);
log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event), e);
throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e);
}

final Callback callback = _callback.orElseGet(() -> (metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit Platform Event for entity with name %s", name), e);
} else {
log.debug(String.format(
"Successfully emitted Platform Event for entity with name %s at offset %s, partition %s, topic %s", name,
metadata.offset(), metadata.partition(), metadata.topic()));
}
});

final String topic = _topicConvention.getPlatformEventTopicName();
_producer.send(new ProducerRecord(topic, key == null ? name : key, record), callback);
_producer.send(new ProducerRecord(topic, key == null ? name : key, record),
_kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.linkedin.metadata.dao.producer;

import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.HashSet;
import java.util.Set;

@Slf4j
@EnableScheduling
@Component
public class KafkaHealthChecker {

@Value("${kafka.producer.requestTimeout}")
private long kafkaProducerRequestTimeout;

@Value("${kafka.producer.backoffTimeout}")
private long kafkaProducerBackOffTimeout;

private static long lastMoment = 0L;
private Set<Long> messagesInProgress = new HashSet<>();

private synchronized long getNextUniqueMoment() {
long moment = System.currentTimeMillis();
lastMoment = moment != lastMoment ? moment : ++lastMoment;
return lastMoment;
}

public Callback getKafkaCallBack(String eventType, String entityDesc) {
long moment = getNextUniqueMoment();
sendMessageStarted(moment);
return (metadata, e) -> {
sendMessageEnded(moment);
if (e != null) {
log.error(String.format("Failed to emit %s for entity %s", eventType, entityDesc), e);
MetricUtils.counter(this.getClass(),
MetricRegistry.name("producer_failed_count", eventType.replaceAll(" ", "_"))).inc();
} else {
log.debug(String.format(
"Successfully emitted %s for entity %s at offset %s, partition %s, topic %s",
eventType, entityDesc, metadata.offset(), metadata.partition(), metadata.topic()));
}
};
}

private synchronized void sendMessageStarted(long uniqueMessageId) {
messagesInProgress.add(uniqueMessageId);
}

private synchronized void sendMessageEnded(long uniqueMessageId) {
messagesInProgress.remove(uniqueMessageId);
}

@Scheduled(cron = "0/15 * * * * ?")
private synchronized void periodicKafkaHealthChecker() {
long moment = System.currentTimeMillis();
long count = messagesInProgress.stream()
.filter(item -> item + kafkaProducerRequestTimeout + kafkaProducerBackOffTimeout < moment)
.count();
if (count > 0) {
log.error("Kafka Health Check Failed. %d message(s) is(are) waiting to be sent.", count);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
Expand All @@ -20,15 +21,17 @@
public class EntityServiceFactory {

@Bean(name = "entityService")
@DependsOn({"entityAspectDao", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
@DependsOn({"entityAspectDao", "kafkaEventProducer", "kafkaHealthChecker",
TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
@Nonnull
protected EntityService createInstance(
Producer<String, ? extends IndexedRecord> producer,
TopicConvention convention,
KafkaHealthChecker kafkaHealthChecker,
@Qualifier("entityAspectDao") AspectDao aspectDao,
EntityRegistry entityRegistry) {

final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention);
final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker);
return new EntityService(aspectDao, eventProducer, entityRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.mxe.TopicConvention;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -30,10 +31,14 @@ public class DataHubKafkaEventProducerFactory {
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
private TopicConvention topicConvention;

@Autowired
private KafkaHealthChecker kafkaHealthChecker;

@Bean(name = "kafkaEventProducer")
protected KafkaEventProducer createInstance() {
return new KafkaEventProducer(
kafkaProducer,
topicConvention);
topicConvention,
kafkaHealthChecker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ public class DataHubKafkaProducerFactory {
@Value("${kafka.schemaRegistry.type}")
private String schemaRegistryType;

@Value("${kafka.producer.retryCount}")
private String kafkaProducerRetryCount;

@Value("${kafka.producer.deliveryTimeout}")
private String kafkaProducerDeliveryTimeout;

@Value("${kafka.producer.requestTimeout}")
private String kafkaProducerRequestTimeout;

@Value("${kafka.producer.backoffTimeout}")
private String kafkaProducerBackOffTimeout;

@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
Expand Down Expand Up @@ -66,6 +78,11 @@ protected Producer<String, IndexedRecord> createInstance(KafkaProperties propert

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());

props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetryCount);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeout);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeout);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerBackOffTimeout);

// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet()
.stream()
Expand Down
5 changes: 5 additions & 0 deletions metadata-service/factories/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ kafka:
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
producer:
retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:15000}
requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} # Application only for type = kafka
Expand Down

0 comments on commit e6c48e5

Please sign in to comment.