Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ui) Add upgrade step to enable CLL impact analysis for existing data #6427

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.metadata.boot.steps.IngestRolesStep;
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices;
import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices;
import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices;
import com.linkedin.metadata.boot.steps.UpgradeDefaultBrowsePathsStep;
Expand Down Expand Up @@ -85,10 +86,11 @@ protected BootstrapManager createInstance() {
final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices =
new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
final RestoreColumnLineageIndices restoreColumnLineageIndices = new RestoreColumnLineageIndices(_entityService, _entityRegistry);

final List<BootstrapStep> finalSteps = new ArrayList<>(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestRolesStep,
ingestDataPlatformsStep, ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep,
removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep));
removeClientIdAspectStep, restoreDbtSiblingsIndices, indexDataPlatformsStep, restoreColumnLineageIndices));

if (_upgradeDefaultBrowsePathsEnabled) {
finalSteps.add(new UpgradeDefaultBrowsePathsStep(_entityService));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.linkedin.metadata.boot.steps;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.InputFields;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ListResult;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.ExtraInfo;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.Objects;

@Slf4j
public class RestoreColumnLineageIndices extends UpgradeStep {
private static final String VERSION = "1";
private static final String UPGRADE_ID = "restore-column-lineage-indices";
private static final Integer BATCH_SIZE = 1000;

private final EntityRegistry _entityRegistry;

public RestoreColumnLineageIndices(@Nonnull final EntityService entityService, @Nonnull final EntityRegistry entityRegistry) {
super(entityService, VERSION, UPGRADE_ID);
_entityRegistry = Objects.requireNonNull(entityRegistry, "entityRegistry must not be null");
}

@Override
public void upgrade() throws Exception {
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());

final int totalUpstreamLineageCount = getAndRestoreUpstreamLineageIndices(0, auditStamp);
int upstreamLineageCount = BATCH_SIZE;
while (upstreamLineageCount < totalUpstreamLineageCount) {
getAndRestoreUpstreamLineageIndices(upstreamLineageCount, auditStamp);
upstreamLineageCount += BATCH_SIZE;
}

final int totalChartInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, 0, auditStamp);
int chartInputFieldsCount = BATCH_SIZE;
while (chartInputFieldsCount < totalChartInputFieldsCount) {
getAndRestoreInputFieldsIndices(Constants.CHART_ENTITY_NAME, chartInputFieldsCount, auditStamp);
chartInputFieldsCount += BATCH_SIZE;
}

final int totalDashboardInputFieldsCount = getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, 0, auditStamp);
int dashboardInputFieldsCount = BATCH_SIZE;
while (dashboardInputFieldsCount < totalDashboardInputFieldsCount) {
getAndRestoreInputFieldsIndices(Constants.DASHBOARD_ENTITY_NAME, dashboardInputFieldsCount, auditStamp);
dashboardInputFieldsCount += BATCH_SIZE;
}
}

@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

private int getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp) {
final AspectSpec upstreamLineageAspectSpec = _entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this AspectSpec? Is there any other method on entityService to produce MCL that doesn't require the AspectSpec?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm all the methods I'm seeing from entityService that produce an MCL use an aspect spec. is there something specific you had in mind/know about?

.getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);

final ListResult<RecordTemplate> latestAspects = _entityService.listLatestAspects(
Constants.DATASET_ENTITY_NAME,
Constants.UPSTREAM_LINEAGE_ASPECT_NAME,
start,
BATCH_SIZE);

if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) {
log.debug("Found 0 upstreamLineage aspects for datasets. Skipping migration.");
return 0;
}

if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) {
// Bad result -- we should log that we cannot migrate this batch of upstreamLineages.
log.warn("Failed to match upstreamLineage aspects with corresponding urns. Found mismatched length between aspects ({})"
+ "and metadata ({}) for metadata {}",
latestAspects.getValues().size(),
latestAspects.getMetadata().getExtraInfos().size(),
latestAspects.getMetadata());
return latestAspects.getTotalCount();
}

for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate upstreamLineageRecord = latestAspects.getValues().get(i);
Urn urn = info.getUrn();
UpstreamLineage upstreamLineage = (UpstreamLineage) upstreamLineageRecord;
if (upstreamLineage == null) {
log.warn("Received null upstreamLineage for urn {}", urn);
continue;
}

_entityService.produceMetadataChangeLog(
urn,
Constants.DATASET_ENTITY_NAME,
Constants.UPSTREAM_LINEAGE_ASPECT_NAME,
upstreamLineageAspectSpec,
null,
upstreamLineage,
null,
null,
auditStamp,
ChangeType.RESTATE);
}

return latestAspects.getTotalCount();
}

private int getAndRestoreInputFieldsIndices(String entityName, int start, AuditStamp auditStamp) throws Exception {
final AspectSpec inputFieldsAspectSpec = _entityRegistry.getEntitySpec(entityName)
.getAspectSpec(Constants.INPUT_FIELDS_ASPECT_NAME);

final ListResult<RecordTemplate> latestAspects = _entityService.listLatestAspects(
entityName,
Constants.INPUT_FIELDS_ASPECT_NAME,
start,
BATCH_SIZE);

if (latestAspects.getTotalCount() == 0 || latestAspects.getValues() == null || latestAspects.getMetadata() == null) {
log.debug("Found 0 inputFields aspects. Skipping migration.");
return 0;
}

if (latestAspects.getValues().size() != latestAspects.getMetadata().getExtraInfos().size()) {
// Bad result -- we should log that we cannot migrate this batch of inputFields.
log.warn("Failed to match inputFields aspects with corresponding urns. Found mismatched length between aspects ({})"
+ "and metadata ({}) for metadata {}",
latestAspects.getValues().size(),
latestAspects.getMetadata().getExtraInfos().size(),
latestAspects.getMetadata());
return latestAspects.getTotalCount();
}

for (int i = 0; i < latestAspects.getValues().size(); i++) {
ExtraInfo info = latestAspects.getMetadata().getExtraInfos().get(i);
RecordTemplate inputFieldsRecord = latestAspects.getValues().get(i);
Urn urn = info.getUrn();
InputFields inputFields = (InputFields) inputFieldsRecord;
if (inputFields == null) {
log.warn("Received null inputFields for urn {}", urn);
continue;
}

_entityService.produceMetadataChangeLog(
urn,
entityName,
Constants.INPUT_FIELDS_ASPECT_NAME,
inputFieldsAspectSpec,
null,
inputFields,
null,
null,
auditStamp,
ChangeType.RESTATE);
}

return latestAspects.getTotalCount();
}
}
Loading