Skip to content

Commit

Permalink
feat(mcl-processor): Update mcl processor hooks (#11134)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Aug 9, 2024
1 parent 5b16252 commit 080f2a2
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 217 deletions.
21 changes: 21 additions & 0 deletions docs/how/kafka-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,27 @@ We've included an environment variable to customize the consumer group id, if yo

- `KAFKA_CONSUMER_GROUP_ID`: The name of the kafka consumer's group id.

#### datahub-mae-consumer MCL Hooks

By default, all MetadataChangeLog processing hooks execute as part of the same kafka consumer group based on the
previously mentioned `KAFKA_CONSUMER_GROUP_ID`.

The various MCL Hooks could alsp be separated into separate groups which allows for controlling parallelization and
prioritization of the hooks.

For example, the `UpdateIndicesHook` and `SiblingsHook` processing can be delayed by other hooks. Separating these
hooks into their own group can reduce latency from these other hooks. The `application.yaml` configuration
includes options for assigning a suffix to the consumer group, see `consumerGroupSuffix`.

| Environment Variable | Default | Description |
|------------------------------------------------|---------|---------------------------------------------------------------------------------------------|
| SIBLINGS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Siblings processing hook. Considered one of the primary hooks in the `datahub-mae-consumer` |
| UPDATE_INDICES_CONSUMER_GROUP_SUFFIX | '' | Primary processing hook. |
| INGESTION_SCHEDULER_HOOK_CONSUMER_GROUP_SUFFIX | '' | Scheduled ingestion hook. |
| INCIDENTS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Incidents hook. |
| ECE_CONSUMER_GROUP_SUFFIX | '' | Entity Change Event hook which publishes to the Platform Events topic. |
| FORMS_HOOK_CONSUMER_GROUP_SUFFIX | '' | Forms processing. |

## Applying Configurations

### Docker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
"com.linkedin.metadata.service",
"com.datahub.event",
"com.linkedin.gms.factory.kafka",
"com.linkedin.gms.factory.kafka.common",
"com.linkedin.gms.factory.kafka.schemaregistry",
"com.linkedin.metadata.boot.kafka",
"com.linkedin.metadata.kafka",
"com.linkedin.metadata.dao.producer",
Expand All @@ -34,7 +32,10 @@
"com.linkedin.gms.factory.context",
"com.linkedin.gms.factory.timeseries",
"com.linkedin.gms.factory.assertion",
"com.linkedin.gms.factory.plugins"
"com.linkedin.gms.factory.plugins",
"com.linkedin.gms.factory.change",
"com.datahub.event.hook",
"com.linkedin.gms.factory.notifications"
},
excludeFilters = {
@ComponentScan.Filter(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.linkedin.metadata.kafka;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@Slf4j
public class MCLKafkaListener {
private static final Histogram kafkaLagStats =
MetricUtils.get()
.histogram(
MetricRegistry.name(
"com.linkedin.metadata.kafka.MetadataChangeLogProcessor", "kafkaLag"));

private final String consumerGroupId;
private final List<MetadataChangeLogHook> hooks;

public MCLKafkaListener(
OperationContext systemOperationContext,
String consumerGroup,
List<MetadataChangeLogHook> hooks) {
this.consumerGroupId = consumerGroup;
this.hooks = hooks;
this.hooks.forEach(hook -> hook.init(systemOperationContext));

log.info(
"Enabled MCL Hooks - Group: {} Hooks: {}",
consumerGroup,
hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList()));
}

public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got MCL event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerGroupId,
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), consumerGroupId + "_received_mcl_count").inc();

MetadataChangeLog event;
try {
event = EventUtils.avroToPegasusMCL(record);
} catch (Exception e) {
MetricUtils.counter(
this.getClass(), consumerGroupId + "_avro_to_pegasus_conversion_failure")
.inc();
log.error("Error deserializing message due to: ", e);
log.error("Message: {}", record.toString());
return;
}

log.info(
"Invoking MCL hooks for consumer: {} urn: {}, aspect name: {}, entity type: {}, change type: {}",
consumerGroupId,
event.getEntityUrn(),
event.hasAspectName() ? event.getAspectName() : null,
event.hasEntityType() ? event.getEntityType() : null,
event.hasChangeType() ? event.getChangeType() : null);

// Here - plug in additional "custom processor hooks"
for (MetadataChangeLogHook hook : this.hooks) {
log.info(
"Invoking MCL hook {} for urn: {}",
hook.getClass().getSimpleName(),
event.getEntityUrn());
try (Timer.Context ignored =
MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
.time()) {
hook.invoke(event);
} catch (Exception e) {
// Just skip this hook and continue. - Note that this represents "at most once"//
// processing.
MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc();
log.error(
"Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e);
}
}
// TODO: Manually commit kafka offsets after full processing.
MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_mcl_count").inc();
log.info(
"Successfully completed MCL hooks for consumer: {} urn: {}",
consumerGroupId,
event.getEntityUrn());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.linkedin.metadata.kafka;

import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Component;

@Slf4j
@EnableKafka
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
public class MCLKafkaListenerRegistrar implements InitializingBean {

@Autowired
@Qualifier("systemOperationContext")
private OperationContext systemOperationContext;

@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
@Qualifier("kafkaEventConsumer")
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
private String consumerGroupBase;

@Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
private String mclVersionedTopicName;

@Value(
"${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}")
private String mclTimeseriesTopicName;

@Autowired private List<MetadataChangeLogHook> metadataChangeLogHooks;

@Override
public void afterPropertiesSet() {
Map<String, List<MetadataChangeLogHook>> hookGroups =
getMetadataChangeLogHooks().stream()
.collect(Collectors.groupingBy(MetadataChangeLogHook::getConsumerGroupSuffix));

log.info(
"MetadataChangeLogProcessor Consumer Groups: {}",
hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet()));

hookGroups.forEach(
(key, hooks) -> {
KafkaListenerEndpoint kafkaListenerEndpoint =
createListenerEndpoint(
buildConsumerGroupName(key),
List.of(mclVersionedTopicName, mclTimeseriesTopicName),
hooks);
registerMCLKafkaListener(kafkaListenerEndpoint, true);
});
}

public List<MetadataChangeLogHook> getMetadataChangeLogHooks() {
return metadataChangeLogHooks.stream()
.filter(MetadataChangeLogHook::isEnabled)
.sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder))
.toList();
}

@SneakyThrows
public void registerMCLKafkaListener(
KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) {
kafkaListenerEndpointRegistry.registerListenerContainer(
kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately);
}

private KafkaListenerEndpoint createListenerEndpoint(
String consumerGroupId, List<String> topics, List<MetadataChangeLogHook> hooks) {
MethodKafkaListenerEndpoint<String, GenericRecord> kafkaListenerEndpoint =
new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(consumerGroupId);
kafkaListenerEndpoint.setGroupId(consumerGroupId);
kafkaListenerEndpoint.setAutoStartup(true);
kafkaListenerEndpoint.setTopics(topics.toArray(new String[topics.size()]));
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
kafkaListenerEndpoint.setBean(
new MCLKafkaListener(systemOperationContext, consumerGroupId, hooks));
try {
kafkaListenerEndpoint.setMethod(
MCLKafkaListener.class.getMethod("consume", ConsumerRecord.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}

return kafkaListenerEndpoint;
}

private String buildConsumerGroupName(@Nonnull String suffix) {
if (suffix.isEmpty()) {
return consumerGroupBase;
} else {
return String.join("-", consumerGroupBase, suffix);
}
}
}
Loading

0 comments on commit 080f2a2

Please sign in to comment.