Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mcp): add kafka batch processing mode option (#4449) #12021

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.entity.Entity;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
Expand Down Expand Up @@ -97,7 +98,7 @@ public class JavaEntityClient implements EntityClient {
private final TimeseriesAspectService timeseriesAspectService;
private final RollbackService rollbackService;
private final EventProducer eventProducer;
private final int batchGetV2Size;
private final EntityClientConfig entityClientConfig;

@Override
@Nullable
Expand Down Expand Up @@ -132,7 +133,7 @@ public Map<Urn, EntityResponse> batchGetV2(

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(urns.iterator(), Math.max(1, batchGetV2Size))
Iterators.partition(urns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
try {
Expand All @@ -159,7 +160,8 @@ public Map<Urn, EntityResponse> batchGetVersionedV2(

Map<Urn, EntityResponse> responseMap = new HashMap<>();

Iterators.partition(versionedUrns.iterator(), Math.max(1, batchGetV2Size))
Iterators.partition(
versionedUrns.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
try {
Expand Down Expand Up @@ -760,48 +762,62 @@ public List<String> batchIngestProposals(
: Constants.UNKNOWN_ACTOR;
final AuditStamp auditStamp = AuditStampUtils.createAuditStamp(actorUrnStr);

AspectsBatch batch =
AspectsBatchImpl.builder()
.mcps(
metadataChangeProposals,
auditStamp,
opContext.getRetrieverContext().get(),
opContext.getValidationContext().isAlternateValidation())
.build();

List<IngestResult> results = entityService.ingestProposal(opContext, batch, async);
entitySearchService.appendRunId(opContext, results);

Map<Pair<Urn, String>, List<IngestResult>> resultMap =
results.stream()
.collect(
Collectors.groupingBy(
result ->
Pair.of(
result.getRequest().getUrn(), result.getRequest().getAspectName())));

// Preserve ordering
return batch.getItems().stream()
.map(
requestItem -> {
// Urns generated
List<Urn> urnsForRequest =
resultMap
.getOrDefault(
Pair.of(requestItem.getUrn(), requestItem.getAspectName()), List.of())
.stream()
.map(IngestResult::getUrn)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());

// Update runIds
urnsForRequest.forEach(
urn -> tryIndexRunId(opContext, urn, requestItem.getSystemMetadata()));

return urnsForRequest.isEmpty() ? null : urnsForRequest.get(0).toString();
})
.collect(Collectors.toList());
List<String> updatedUrns = new ArrayList<>();
Iterators.partition(
metadataChangeProposals.iterator(), Math.max(1, entityClientConfig.getBatchGetV2Size()))
.forEachRemaining(
batch -> {
AspectsBatch aspectsBatch =
AspectsBatchImpl.builder()
.mcps(
batch,
auditStamp,
opContext.getRetrieverContext().get(),
opContext.getValidationContext().isAlternateValidation())
.build();

List<IngestResult> results =
entityService.ingestProposal(opContext, aspectsBatch, async);
entitySearchService.appendRunId(opContext, results);

Map<Pair<Urn, String>, List<IngestResult>> resultMap =
results.stream()
.collect(
Collectors.groupingBy(
result ->
Pair.of(
result.getRequest().getUrn(),
result.getRequest().getAspectName())));

// Preserve ordering
updatedUrns.addAll(
aspectsBatch.getItems().stream()
.map(
requestItem -> {
// Urns generated
List<Urn> urnsForRequest =
resultMap
.getOrDefault(
Pair.of(requestItem.getUrn(), requestItem.getAspectName()),
List.of())
.stream()
.map(IngestResult::getUrn)
.filter(Objects::nonNull)
.distinct()
.collect(Collectors.toList());

// Update runIds
urnsForRequest.forEach(
urn ->
tryIndexRunId(opContext, urn, requestItem.getSystemMetadata()));

return urnsForRequest.isEmpty()
? null
: urnsForRequest.get(0).toString();
})
.collect(Collectors.toList()));
});
return updatedUrns;
}

@SneakyThrows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClientCache;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.metadata.entity.DeleteEntityService;
Expand Down Expand Up @@ -43,7 +44,7 @@ public SystemJavaEntityClient(
RollbackService rollbackService,
EventProducer eventProducer,
EntityClientCacheConfig cacheConfig,
int batchGetV2Size) {
EntityClientConfig entityClientConfig) {
super(
entityService,
deleteEntityService,
Expand All @@ -54,7 +55,7 @@ public SystemJavaEntityClient(
timeseriesAspectService,
rollbackService,
eventProducer,
batchGetV2Size);
entityClientConfig);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemJavaEntityClient.class, cacheConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RequiredFieldNotPresentException;
import com.linkedin.domain.Domains;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
Expand Down Expand Up @@ -90,7 +91,7 @@ private JavaEntityClient getJavaEntityClient() {
_timeseriesAspectService,
rollbackService,
_eventProducer,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.Mockito.when;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.config.cache.EntityDocCountCacheConfiguration;
Expand Down Expand Up @@ -330,6 +331,6 @@ private EntityClient entityClientHelper(
null,
null,
null,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.datahubproject.test.search.SearchTestUtils.getGraphQueryConfiguration;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.config.DataHubAppConfiguration;
import com.linkedin.metadata.config.MetadataChangeProposalConfig;
Expand Down Expand Up @@ -276,6 +277,6 @@ protected EntityClient entityClient(
null,
null,
null,
1);
EntityClientConfig.builder().batchGetV2Size(1).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.entityclient.RestliEntityClientFactory;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.dao.throttle.ThrottleControl;
import com.linkedin.metadata.dao.throttle.ThrottleSensor;
import com.linkedin.metadata.kafka.config.MetadataChangeProposalProcessorCondition;
import com.linkedin.metadata.kafka.util.KafkaListenerUtil;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.MDC;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -43,7 +38,6 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Slf4j
Expand Down Expand Up @@ -80,38 +74,7 @@ public class MetadataChangeProposalsProcessor {

@PostConstruct
public void registerConsumerThrottle() {
if (kafkaThrottle != null
&& provider
.getMetadataChangeProposal()
.getThrottle()
.getComponents()
.getMceConsumer()
.isEnabled()) {
log.info("MCE Consumer Throttle Enabled");
kafkaThrottle.addCallback(
(throttleEvent) -> {
Optional<MessageListenerContainer> container =
Optional.ofNullable(registry.getListenerContainer(mceConsumerGroupId));
if (container.isEmpty()) {
log.warn(
"Expected container was missing: {} throttle is not possible.",
mceConsumerGroupId);
} else {
if (throttleEvent.isThrottled()) {
container.ifPresent(MessageListenerContainer::pause);
return ThrottleControl.builder()
// resume consumer after sleep
.callback(
(resumeEvent) -> container.ifPresent(MessageListenerContainer::resume))
.build();
}
}

return ThrottleControl.NONE;
});
} else {
log.info("MCE Consumer Throttle Disabled");
}
KafkaListenerUtil.registerThrottle(kafkaThrottle, provider, registry, mceConsumerGroupId);
}

@KafkaListener(
Expand All @@ -132,7 +95,9 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());

log.debug("Record {}", record);
if (log.isDebugEnabled()) {
log.debug("Record {}", record);
}

MetadataChangeProposal event = new MetadataChangeProposal();
try {
Expand All @@ -148,45 +113,18 @@ public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord)
MDC.put(
MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));

log.debug("MetadataChangeProposal {}", event);
// TODO: Get this from the event itself.
if (log.isDebugEnabled()) {
log.debug("MetadataChangeProposal {}", event);
}
String urn = entityClient.ingestProposal(systemOperationContext, event, false);
log.info("Successfully processed MCP event urn: {}", urn);
} catch (Throwable throwable) {
log.error("MCP Processor Error", throwable);
log.error("Message: {}", record);
sendFailedMCP(event, throwable);
KafkaListenerUtil.sendFailedMCP(event, throwable, fmcpTopicName, kafkaProducer);
}
} finally {
MDC.clear();
}
}

private void sendFailedMCP(@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal failedMetadataChangeProposal =
createFailedMCPEvent(event, throwable);
try {
final GenericRecord genericFailedMCERecord =
EventUtils.pegasusToAvroFailedMCP(failedMetadataChangeProposal);
log.debug("Sending FailedMessages to topic - {}", fmcpTopicName);
log.info(
"Error while processing FMCP: FailedMetadataChangeProposal - {}",
failedMetadataChangeProposal);
kafkaProducer.send(new ProducerRecord<>(fmcpTopicName, genericFailedMCERecord));
} catch (IOException e) {
log.error(
"Error while sending FailedMetadataChangeProposal: Exception - {}, FailedMetadataChangeProposal - {}",
e.getStackTrace(),
failedMetadataChangeProposal);
}
}

@Nonnull
private FailedMetadataChangeProposal createFailedMCPEvent(
@Nonnull MetadataChangeProposal event, @Nonnull Throwable throwable) {
final FailedMetadataChangeProposal fmcp = new FailedMetadataChangeProposal();
fmcp.setError(ExceptionUtils.getStackTrace(throwable));
fmcp.setMetadataChangeProposal(event);
return fmcp;
}
}
Loading
Loading