Skip to content

Commit

Permalink
feat(mcp): add kafka batch processing mode option (datahub-project#4449
Browse files Browse the repository at this point in the history
…) (datahub-project#12021)

Co-authored-by: RyanHolstien <[email protected]>
  • Loading branch information
2 people authored and sleeperdeep committed Dec 17, 2024
1 parent 1b0deb7 commit cad4be1
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.entity.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class JavaEntityClient implements EntityClient {
private final TimeseriesAspectService timeseriesAspectService;
private final RollbackService rollbackService;
private final EventProducer eventProducer;
private final int batchGetV2Size;
private final EntityClientConfig entityClientConfig;

@Override
@Nullable
Expand Down Expand Up @@ -132,7 +133,7 @@ public Map<Urn, EntityResponse> batchGetV2(

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size))
Iterators.partition(urns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
try {
Expand All @@ -159,7 +160,8 @@ public Map<Urn, EntityResponse> batchGetVersionedV2(

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size))
Iterators.partition(
versionedUrns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
try {
Expand Down Expand Up @@ -760,48 +762,62 @@ public List<String> batchIngestProposals(
: Constants.UNKNOWN_ACTOR;
final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr);

AspectsBatch batch =
AspectsBatchImpl.builder()
.mcps(
metadataChangeProposals,
auditStamp,
opContext.getRetrieverContext().get(),
opContext.getValidationContext().isAlternateValidation())
.build();

List<IngestResult> results = entityService.ingestProposal(opContext, batch, async);
entitySearchService.appendRunId(opContext, results);

Map<Pair<Urn, String>, List<IngestResult>> resultMap =
results.stream()
.collect(
Collectors.groupingBy(
result ->
Pair.of(
result.getRequest().getUrn(), result.getRequest().getAspectName())));

// Preserve ordering
return batch.getItems().stream()
.map(
requestItem -> {
// Urns generated
List<Urn> urnsForRequest =
resultMap
.getOrDefault(
Pair.of(requestItem.getUrn(), requestItem.getAspectName()), List.of())
.stream()
.map(IngestResult::getUrn)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());

// Update runIds
urnsForRequest.forEach(
urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata()));

return urnsForRequest.isEmpty() ? null : urnsForRequest.get(0).toString();
})
.collect(Collectors.toList());
List<String> updatedUrns = new ArrayList<>();
Iterators.partition(
metadataChangeProposals.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
AspectsBatch aspectsBatch =
AspectsBatchImpl.builder()
.mcps(
batch,
auditStamp,
opContext.getRetrieverContext().get(),
opContext.getValidationContext().isAlternateValidation())
.build();

List<IngestResult> results =
entityService.ingestProposal(opContext, aspectsBatch, async);
entitySearchService.appendRunId(opContext, results);

Map<Pair<Urn, String>, List<IngestResult>> resultMap =
results.stream()
.collect(
Collectors.groupingBy(
result ->
Pair.of(
result.getRequest().getUrn(),
result.getRequest().getAspectName())));

// Preserve ordering
updatedUrns.addAll(
aspectsBatch.getItems().stream()
.map(
requestItem -> {
// Urns generated
List<Urn> urnsForRequest =
resultMap
.getOrDefault(
Pair.of(requestItem.getUrn(), requestItem.getAspectName()),
List.of())
.stream()
.map(IngestResult::getUrn)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());

// Update runIds
urnsForRequest.forEach(
urn ->
tryIndexRunId(opContext, urn, requestItem.getSystemMetadata()));

return urnsForRequest.isEmpty()
? null
: urnsForRequest.get(0).toString();
})
.collect(Collectors.toList()));
});
return updatedUrns;
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClientCache;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.metadata.entity.DeleteEntityService;
Expand Down Expand Up @@ -43,7 +44,7 @@ public SystemJavaEntityClient(
RollbackService rollbackService,
EventProducer eventProducer,
EntityClientCacheConfig cacheConfig,
int batchGetV2Size) {
EntityClientConfig entityClientConfig) {
super(
entityService,
deleteEntityService,
Expand All @@ -54,7 +55,7 @@ public SystemJavaEntityClient(
timeseriesAspectService,
rollbackService,
eventProducer,
batchGetV2Size);
entityClientConfig);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RequiredFieldNotPresentException;
import com.linkedin.domain.Domains;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
Expand Down Expand Up @@ -90,7 +91,7 @@ private JavaEntityClient getJavaEntityClient() {
_timeseriesAspectService,
rollbackService,
_eventProducer,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.when;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration;
Expand Down Expand Up @@ -330,6 +331,6 @@ private EntityClient entityClientHelper(
null,
null,
null,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.DataHubAppConfiguration;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
Expand Down Expand Up @@ -276,6 +277,6 @@ protected EntityClient entityClient(
null,
null,
null,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.dao.throttle.ThrottleControl;
import com.linkedin.metadata.dao.throttle.ThrottleSensor;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.metadata.kafka.util.KafkaListenerUtil;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -43,7 +38,6 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Slf4j
Expand Down Expand Up @@ -80,38 +74,7 @@ public class MetadataChangeProposalsProcessor {

@PostConstruct
public void registerConsumerThrottle() {
if (kafkaThrottle != null
&& provider
.getMetadataChangeProposal()
.getThrottle()
.getComponents()
.getMceConsumer()
.isEnabled()) {
log.info("MCE Consumer Throttle Enabled");
kafkaThrottle.addCallback(
(throttleEvent) -> {
Optional<MessageListenerContainer> container =
Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId));
if (container.isEmpty()) {
log.warn(
"Expected container was missing: {} throttle is not possible.",
mceConsumerGroupId);
} else {
if (throttleEvent.isThrottled()) {
container.ifPresent(MessageListenerContainer::pause);
return ThrottleControl.builder()
// resume consumer after sleep
.callback(
(resumeEvent) -> container.ifPresent(MessageListenerContainer::resume))
.build();
}
}

return ThrottleControl.NONE;
});
} else {
log.info("MCE Consumer Throttle Disabled");
}
KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId);
}

@KafkaListener(
Expand All @@ -132,7 +95,9 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);
if (log.isDebugEnabled()) {
log.debug("Record {}", record);
}

MetadataChangeProposal event = new MetadataChangeProposal();
try {
Expand All @@ -148,45 +113,18 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
MDC.put(
MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));

log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
if (log.isDebugEnabled()) {
log.debug("MetadataChangeProposal {}", event);
}
String urn = entityClient.ingestProposal(systemOperationContext, event, false);
log.info("Successfully processed MCP event urn: {}", urn);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);
sendFailedMCP(event, throwable);
KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer);
}
} finally {
MDC.clear();
}
}

private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal failedMetadataChangeProposal =
createFailedMCPEvent(event, throwable);
try {
final GenericRecord genericFailedMCERecord =
EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal);
log.debug("Sending FailedMessages to topic - {}", fmcpTopicName);
log.info(
"Error while processing FMCP: FailedMetadataChangeProposal - {}",
failedMetadataChangeProposal);
kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord));
} catch (IOException e) {
log.error(
"Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}",
e.getStackTrace(),
failedMetadataChangeProposal);
}
}

@Nonnull
private FailedMetadataChangeProposal createFailedMCPEvent(
@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal();
fmcp.setError(ExceptionUtils.getStackTrace(throwable));
fmcp.setMetadataChangeProposal(event);
return fmcp;
}
}
Loading

0 comments on commit cad4be1

Please sign in to comment.