diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java
index 2af6d78b41612c..e0685f329b0d5a 100644
--- a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java
+++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java
@@ -19,7 +19,6 @@
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
@@ -29,7 +28,6 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-
/**
*
The topic names that this emits to can be controlled by constructing this with a {@link TopicConvention}.
* If none is given, defaults to a {@link TopicConventionImpl} with the default delimiter of an underscore (_).
@@ -38,32 +36,21 @@
public class KafkaEventProducer implements EventProducer {
private final Producer _producer;
- private final Optional _callback;
private final TopicConvention _topicConvention;
+ private final KafkaHealthChecker _kafkaHealthChecker;
/**
* Constructor.
*
* @param producer The Kafka {@link Producer} to use
* @param topicConvention the convention to use to get kafka topic names
+ * @param kafkaHealthChecker The {@link Callback} to invoke when the request is completed
*/
public KafkaEventProducer(@Nonnull final Producer producer,
- @Nonnull final TopicConvention topicConvention) {
- this(producer, topicConvention, null);
- }
-
- /**
- * Constructor.
- *
- * @param producer The Kafka {@link Producer} to use
- * @param topicConvention the convention to use to get kafka topic names
- * @param callback The {@link Callback} to invoke when the request is completed
- */
- public KafkaEventProducer(@Nonnull final Producer producer,
- @Nonnull final TopicConvention topicConvention, @Nullable final Callback callback) {
+ @Nonnull final TopicConvention topicConvention, @Nonnull final KafkaHealthChecker kafkaHealthChecker) {
_producer = producer;
- _callback = Optional.ofNullable(callback);
_topicConvention = topicConvention;
+ _kafkaHealthChecker = kafkaHealthChecker;
}
@Override
@@ -93,30 +80,16 @@ public void produceMetadataAuditEvent(@Nonnull Urn urn, @Nullable Snapshot oldSn
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataAuditEvent: %s",
urn,
- metadataAuditEvent.toString()));
+ metadataAuditEvent));
record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
} catch (IOException e) {
- log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent.toString()), e);
+ log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}
- if (_callback.isPresent()) {
- _producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
- _callback.get());
- } else {
- _producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
- (metadata, e) -> {
- if (e != null) {
- log.error(String.format("Failed to emit MAE for entity with urn %s", urn), e);
- } else {
- log.debug(String.format("Successfully emitted MAE for entity with urn %s at offset %s, partition %s, topic %s",
- urn,
- metadata.offset(),
- metadata.partition(),
- metadata.topic()));
- }
- });
- }
+ String topic = _topicConvention.getMetadataAuditEventTopicName();
+ _producer.send(new ProducerRecord(topic, urn.toString(), record),
+ _kafkaHealthChecker.getKafkaCallBack("MAE", urn.toString()));
}
@Override
@@ -127,10 +100,10 @@ public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataChangeLog: %s",
urn,
- metadataChangeLog.toString()));
+ metadataChangeLog));
record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
} catch (IOException e) {
- log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog.toString()), e);
+ log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataChangeLog), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}
@@ -138,22 +111,8 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
if (aspectSpec.isTimeseries()) {
topic = _topicConvention.getMetadataChangeLogTimeseriesTopicName();
}
-
- if (_callback.isPresent()) {
- _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
- } else {
- _producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
- if (e != null) {
- log.error(String.format("Failed to emit MCL for entity with urn %s", urn), e);
- } else {
- log.debug(String.format("Successfully emitted MCL for entity with urn %s at offset %s, partition %s, topic %s",
- urn,
- metadata.offset(),
- metadata.partition(),
- metadata.topic()));
- }
- });
- }
+ _producer.send(new ProducerRecord(topic, urn.toString(), record),
+ _kafkaHealthChecker.getKafkaCallBack("MCL", urn.toString()));
}
@Override
@@ -176,21 +135,8 @@ record = EventUtils.pegasusToAvroMCP(metadataChangeProposal);
}
String topic = _topicConvention.getMetadataChangeProposalTopicName();
- if (_callback.isPresent()) {
- _producer.send(new ProducerRecord(topic, urn.toString(), record), _callback.get());
- } else {
- _producer.send(new ProducerRecord(topic, urn.toString(), record), (metadata, e) -> {
- if (e != null) {
- log.error(String.format("Failed to emit MCP for entity with urn %s", urn), e);
- } else {
- log.debug(String.format("Successfully emitted MCP for entity with urn %s at offset %s, partition %s, topic %s",
- urn,
- metadata.offset(),
- metadata.partition(),
- metadata.topic()));
- }
- });
- }
+ _producer.send(new ProducerRecord(topic, urn.toString(), record),
+ _kafkaHealthChecker.getKafkaCallBack("MCP", urn.toString()));
}
@Override
@@ -199,25 +145,16 @@ public void producePlatformEvent(@Nonnull String name, @Nullable String key, @No
try {
log.debug(String.format("Converting Pegasus Event to Avro Event urn %s\nEvent: %s",
name,
- event.toString()));
+ event));
record = EventUtils.pegasusToAvroPE(event);
} catch (IOException e) {
- log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event.toString()), e);
+ log.error(String.format("Failed to convert Pegasus Platform Event to Avro: %s", event), e);
throw new ModelConversionException("Failed to convert Pegasus Platform Event to Avro", e);
}
- final Callback callback = _callback.orElseGet(() -> (metadata, e) -> {
- if (e != null) {
- log.error(String.format("Failed to emit Platform Event for entity with name %s", name), e);
- } else {
- log.debug(String.format(
- "Successfully emitted Platform Event for entity with name %s at offset %s, partition %s, topic %s", name,
- metadata.offset(), metadata.partition(), metadata.topic()));
- }
- });
-
final String topic = _topicConvention.getPlatformEventTopicName();
- _producer.send(new ProducerRecord(topic, key == null ? name : key, record), callback);
+ _producer.send(new ProducerRecord(topic, key == null ? name : key, record),
+ _kafkaHealthChecker.getKafkaCallBack("Platform Event", name));
}
@VisibleForTesting
diff --git a/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java
new file mode 100644
index 00000000000000..9f956b399eaff3
--- /dev/null
+++ b/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaHealthChecker.java
@@ -0,0 +1,71 @@
+package com.linkedin.metadata.dao.producer;
+
+import com.codahale.metrics.MetricRegistry;
+import com.linkedin.metadata.utils.metrics.MetricUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Callback;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Slf4j
+@EnableScheduling
+@Component
+public class KafkaHealthChecker {
+
+ @Value("${kafka.producer.requestTimeout}")
+ private long kafkaProducerRequestTimeout;
+
+ @Value("${kafka.producer.backoffTimeout}")
+ private long kafkaProducerBackOffTimeout;
+
+ private static long lastMoment = 0L;
+ private Set messagesInProgress = new HashSet<>();
+
+ private synchronized long getNextUniqueMoment() {
+ long moment = System.currentTimeMillis();
+ lastMoment = moment != lastMoment ? moment : ++lastMoment;
+ return lastMoment;
+ }
+
+ public Callback getKafkaCallBack(String eventType, String entityDesc) {
+ long moment = getNextUniqueMoment();
+ sendMessageStarted(moment);
+ return (metadata, e) -> {
+ sendMessageEnded(moment);
+ if (e != null) {
+ log.error(String.format("Failed to emit %s for entity %s", eventType, entityDesc), e);
+ MetricUtils.counter(this.getClass(),
+ MetricRegistry.name("producer_failed_count", eventType.replaceAll(" ", "_"))).inc();
+ } else {
+ log.debug(String.format(
+ "Successfully emitted %s for entity %s at offset %s, partition %s, topic %s",
+ eventType, entityDesc, metadata.offset(), metadata.partition(), metadata.topic()));
+ }
+ };
+ }
+
+ private synchronized void sendMessageStarted(long uniqueMessageId) {
+ messagesInProgress.add(uniqueMessageId);
+ }
+
+ private synchronized void sendMessageEnded(long uniqueMessageId) {
+ messagesInProgress.remove(uniqueMessageId);
+ }
+
+ @Scheduled(cron = "0/15 * * * * ?")
+ private synchronized void periodicKafkaHealthChecker() {
+ long moment = System.currentTimeMillis();
+ long count = messagesInProgress.stream()
+ .filter(item -> item + kafkaProducerRequestTimeout + kafkaProducerBackOffTimeout < moment)
+ .count();
+ if (count > 0) {
+ log.error("Kafka Health Check Failed. %d message(s) is(are) waiting to be sent.", count);
+ }
+ }
+
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
index a8f876dc9e9a9d..01d4c15ba2d388 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
@@ -2,6 +2,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
+import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.EntityRegistry;
@@ -20,15 +21,17 @@
public class EntityServiceFactory {
@Bean(name = "entityService")
- @DependsOn({"entityAspectDao", "kafkaEventProducer", TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
+ @DependsOn({"entityAspectDao", "kafkaEventProducer", "kafkaHealthChecker",
+ TopicConventionFactory.TOPIC_CONVENTION_BEAN, "entityRegistry"})
@Nonnull
protected EntityService createInstance(
Producer producer,
TopicConvention convention,
+ KafkaHealthChecker kafkaHealthChecker,
@Qualifier("entityAspectDao") AspectDao aspectDao,
EntityRegistry entityRegistry) {
- final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention);
+ final KafkaEventProducer eventProducer = new KafkaEventProducer(producer, convention, kafkaHealthChecker);
return new EntityService(aspectDao, eventProducer, entityRegistry);
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java
index cdb230724b85fe..5aa67ba5d2817f 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaEventProducerFactory.java
@@ -3,6 +3,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
+import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.mxe.TopicConvention;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.Producer;
@@ -30,10 +31,14 @@ public class DataHubKafkaEventProducerFactory {
@Qualifier(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
private TopicConvention topicConvention;
+ @Autowired
+ private KafkaHealthChecker kafkaHealthChecker;
+
@Bean(name = "kafkaEventProducer")
protected KafkaEventProducer createInstance() {
return new KafkaEventProducer(
kafkaProducer,
- topicConvention);
+ topicConvention,
+ kafkaHealthChecker);
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java
index 0549ed2fc2c8b5..47215dd494cf4a 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/DataHubKafkaProducerFactory.java
@@ -35,6 +35,18 @@ public class DataHubKafkaProducerFactory {
@Value("${kafka.schemaRegistry.type}")
private String schemaRegistryType;
+ @Value("${kafka.producer.retryCount}")
+ private String kafkaProducerRetryCount;
+
+ @Value("${kafka.producer.deliveryTimeout}")
+ private String kafkaProducerDeliveryTimeout;
+
+ @Value("${kafka.producer.requestTimeout}")
+ private String kafkaProducerRequestTimeout;
+
+ @Value("${kafka.producer.backoffTimeout}")
+ private String kafkaProducerBackOffTimeout;
+
@Autowired
@Lazy
@Qualifier("kafkaSchemaRegistry")
@@ -66,6 +78,11 @@ protected Producer createInstance(KafkaProperties propert
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, schemaRegistryConfig.getSerializer().getName());
+ props.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetryCount);
+ props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeout);
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeout);
+ props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerBackOffTimeout);
+
// Override KafkaProperties with SchemaRegistryConfig only for non-empty values
schemaRegistryConfig.getProperties().entrySet()
.stream()
diff --git a/metadata-service/factories/src/main/resources/application.yml b/metadata-service/factories/src/main/resources/application.yml
index 116f134080f735..54330a0e7b0e87 100644
--- a/metadata-service/factories/src/main/resources/application.yml
+++ b/metadata-service/factories/src/main/resources/application.yml
@@ -163,6 +163,11 @@ kafka:
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:1}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVER:http://localhost:9092}
+ producer:
+ retryCount: ${KAFKA_PRODUCER_RETRY_COUNT:3}
+ deliveryTimeout: ${KAFKA_PRODUCER_DELIVERY_TIMEOUT:15000}
+ requestTimeout: ${KAFKA_PRODUCER_REQUEST_TIMEOUT:3000}
+ backoffTimeout: ${KAFKA_PRODUCER_BACKOFF_TIMEOUT:500}
schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081} # Application only for type = kafka