Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(restore-indices): add endpoint for restore indices, add basic check for graph #5805

Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package com.linkedin.datahub.upgrade.restoreindices;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.SystemMetadata;
import io.ebean.EbeanServer;
import io.ebean.ExpressionList;
import io.ebean.PagedList;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,11 +21,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;


public class SendMAEStep implements UpgradeStep {
Expand All @@ -44,131 +34,23 @@ public class SendMAEStep implements UpgradeStep {

private final EbeanServer _server;
private final EntityService _entityService;
private final EntityRegistry _entityRegistry;

public static class KafkaJobResult {
public int ignored = 0;
public int rowsMigrated = 0;
public long timeSqlQueryMs = 0;
public long timeUrnMs = 0;
public long timeEntityRegistryCheckMs = 0;
public long aspectCheckMs = 0;
public long createRecordMs = 0;
public long sendMessageMs = 0;
}

public static void reportStat(UpgradeContext context, String id, long timeMs) {
context.report().addLine(String.format(
"Mins taken for %s so far is %.2f", id, (float) timeMs / 1000 / 60));
}

public class KafkaJob implements Callable<KafkaJobResult> {
public class KafkaJob implements Callable<RestoreIndicesResult> {
UpgradeContext context;
int start;
JobArgs args;
public KafkaJob(UpgradeContext context, int start, JobArgs args) {
RestoreIndicesArgs args;
public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {
this.context = context;
this.start = start;
this.args = args;
}
@Override
public KafkaJobResult call() {
KafkaJobResult result = new KafkaJobResult();
int ignored = 0;
int rowsMigrated = 0;
context.report().addLine(String.format(
"Reading rows %s through %s from the aspects table started.", start, start + args.batchSize));
long startTime = System.currentTimeMillis();
PagedList<EbeanAspectV2> rows = getPagedAspects(start, args);
result.timeSqlQueryMs = System.currentTimeMillis() - startTime;
context.report().addLine(String.format(
"Reading rows %s through %s from the aspects table completed.", start, start + args.batchSize));

for (EbeanAspectV2 aspect : rows.getList()) {
// 1. Extract an Entity type from the entity Urn
startTime = System.currentTimeMillis();
Urn urn;
try {
urn = Urn.createFromString(aspect.getKey().getUrn());
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to bind Urn with value %s into Urn object: %s. Ignoring row.",
aspect.getKey().getUrn(), e));
ignored = ignored + 1;
continue;
}
result.timeUrnMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

// 2. Verify that the entity associated with the aspect is found in the registry.
final String entityName = urn.getEntityType();
final EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(entityName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find entity with name %s in Entity Registry: %s. Ignoring row.",
entityName, e));
ignored = ignored + 1;
continue;
}
result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
final String aspectName = aspect.getKey().getAspect();

// 3. Verify that the aspect is a valid aspect associated with the entity
AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName);
if (aspectSpec == null) {
context.report()
.addLine(String.format("Failed to find aspect with name %s associated with entity named %s", aspectName,
entityName));
ignored = ignored + 1;
continue;
}
result.aspectCheckMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

// 4. Create record from json aspect
final RecordTemplate aspectRecord;
try {
aspectRecord = EntityUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to deserialize row %s for entity %s, aspect %s: %s. Ignoring row.",
aspect.getMetadata(), entityName, aspectName, e));
ignored = ignored + 1;
continue;
}
result.createRecordMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata());

// 5. Produce MAE events for the aspect record
_entityService.produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null,
latestSystemMetadata,
new AuditStamp().setActor(UrnUtils.getUrn(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()),
ChangeType.RESTATE);
result.sendMessageMs += System.currentTimeMillis() - startTime;

rowsMigrated++;
}

try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted while sleeping after successful batch migration.");
}
result.ignored = ignored;
result.rowsMigrated = rowsMigrated;
return result;
public RestoreIndicesResult call() {
return _entityService.restoreIndices(args, context.report()::addLine);
}
}

public SendMAEStep(final EbeanServer server, final EntityService entityService, final EntityRegistry entityRegistry) {
_server = server;
_entityService = entityService;
_entityRegistry = entityRegistry;
}

@Override
Expand All @@ -181,73 +63,39 @@ public int retryCount() {
return 0;
}

private List<KafkaJobResult> iterateFutures(List<Future<KafkaJobResult>> futures) {
int beforeSize = futures.size();
int afterSize = futures.size();
List<KafkaJobResult> result = new ArrayList<>();
while (afterSize > 0 && beforeSize == afterSize) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// suppress
}
for (Future<KafkaJobResult> future: new ArrayList<>(futures)) {
if (future.isDone()) {
try {
result.add(future.get());
futures.remove(future);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResult>> futures) {
List<RestoreIndicesResult> result = new ArrayList<>();
for (Future<RestoreIndicesResult> future: new ArrayList<>(futures)) {
if (future.isDone()) {
try {
result.add(future.get());
futures.remove(future);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
afterSize = futures.size();
}
return result;
}

private static class JobArgs {
int batchSize;
int numThreads;
long batchDelayMs;
String aspectName;
String urn;
String urnLike;
}

private JobArgs getArgs(UpgradeContext context) {
JobArgs result = new JobArgs();
private RestoreIndicesArgs getArgs(UpgradeContext context) {
RestoreIndicesArgs result = new RestoreIndicesArgs();
result.batchSize = getBatchSize(context.parsedArgs());
context.report().addLine(String.format("batchSize is %d", result.batchSize));
result.numThreads = getThreadCount(context.parsedArgs());
context.report().addLine(String.format("numThreads is %d", result.numThreads));
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
context.report().addLine(String.format("batchDelayMs is %d", result.batchDelayMs));
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
context.report().addLine(String.format("aspect is %s", result.aspectName));
context.report().addLine(String.format("Found aspectName arg as %s", result.aspectName));
} else {
context.report().addLine("No aspectName arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.URN_ARG_NAME)) {
result.urn = context.parsedArgs().get(RestoreIndices.URN_ARG_NAME).get();
context.report().addLine(String.format("urn is %s", result.urn));
context.report().addLine(String.format("Found urn arg as %s", result.urn));
} else {
context.report().addLine("No urn arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.URN_LIKE_ARG_NAME)) {
result.urnLike = context.parsedArgs().get(RestoreIndices.URN_LIKE_ARG_NAME).get();
context.report().addLine(String.format("urnLike is %s", result.urnLike));
context.report().addLine(String.format("Found urn like arg as %s", result.urnLike));
} else {
context.report().addLine("No urnLike arg present");
}
return result;
}

private int getRowCount(JobArgs args) {
private int getRowCount(RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> countExp =
_server.find(EbeanAspectV2.class)
.where()
Expand All @@ -267,8 +115,8 @@ private int getRowCount(JobArgs args) {
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
KafkaJobResult finalJobResult = new KafkaJobResult();
JobArgs args = getArgs(context);
RestoreIndicesResult finalJobResult = new RestoreIndicesResult();
RestoreIndicesArgs args = getArgs(context);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(args.numThreads);

context.report().addLine("Sending MAE from local DB");
Expand All @@ -278,21 +126,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60));
int start = 0;

List<Future<KafkaJobResult>> futures = new ArrayList<>();
List<Future<RestoreIndicesResult>> futures = new ArrayList<>();
startTime = System.currentTimeMillis();
while (start < rowCount) {
while (futures.size() < args.numThreads && start < rowCount) {
futures.add(executor.submit(new KafkaJob(context, start, args)));
start = start + args.batchSize;
}
List<KafkaJobResult> tmpResults = iterateFutures(futures);
for (KafkaJobResult tmpResult: tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
}
args = args.clone();
args.start = start;
futures.add(executor.submit(new KafkaJob(context, args)));
start = start + args.batchSize;
}
while (futures.size() > 0) {
List<KafkaJobResult> tmpResults = iterateFutures(futures);
for (KafkaJobResult tmpResult: tmpResults) {
List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
for (RestoreIndicesResult tmpResult: tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
}
}
Expand All @@ -310,22 +154,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
};
}

private static void reportStats(UpgradeContext context, KafkaJobResult finalResult, KafkaJobResult tmpResult,
private static void reportStats(UpgradeContext context, RestoreIndicesResult finalResult, RestoreIndicesResult tmpResult,
int rowCount, long startTime) {
finalResult.ignored += tmpResult.ignored;
finalResult.rowsMigrated += tmpResult.rowsMigrated;
finalResult.timeSqlQueryMs += tmpResult.timeSqlQueryMs;
reportStat(context, "sql query", finalResult.timeSqlQueryMs);
finalResult.timeUrnMs += tmpResult.timeUrnMs;
reportStat(context, "timeUrnMs", finalResult.timeUrnMs);
finalResult.timeEntityRegistryCheckMs += tmpResult.timeEntityRegistryCheckMs;
reportStat(context, "timeEntityRegistryCheckMs", finalResult.timeEntityRegistryCheckMs);
finalResult.aspectCheckMs += tmpResult.aspectCheckMs;
reportStat(context, "aspectCheckMs", finalResult.aspectCheckMs);
finalResult.createRecordMs += tmpResult.createRecordMs;
reportStat(context, "createRecordMs", finalResult.createRecordMs);
finalResult.sendMessageMs += tmpResult.sendMessageMs;
reportStat(context, "sendMessageMs", finalResult.sendMessageMs);
context.report().addLine(String.format("metrics so far %s", finalResult));

long currentTime = System.currentTimeMillis();
float timeSoFarMinutes = (float) (currentTime - startTime) / 1000 / 60;
Expand All @@ -343,29 +182,6 @@ private static void reportStats(UpgradeContext context, KafkaJobResult finalResu
timeSoFarMinutes, estimatedTimeMinutesComplete, totalTimeComplete));
}

private PagedList<EbeanAspectV2> getPagedAspects(final int start, final JobArgs args) {
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
if (args.aspectName != null) {
exp = exp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
}
if (args.urn != null) {
exp = exp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
}
if (args.urnLike != null) {
exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
return exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.setMaxRows(args.batchSize)
.findPagedList();
}

private int getBatchSize(final Map<String, Optional<String>> parsedArgs) {
return getInt(parsedArgs, DEFAULT_BATCH_SIZE, RestoreIndices.BATCH_SIZE_ARG_NAME);
}
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import click

from datahub import __package_name__
from datahub.cli.cli_utils import get_url_and_token
from datahub.cli.json_file import check_mce_file
from datahub.graph_consistency import check_data_platform
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.transform_registry import transform_registry
Expand Down Expand Up @@ -48,3 +50,9 @@ def plugins(verbose: bool) -> None:
click.echo(
f"If a plugin is disabled, try running: pip install '{__package_name__}[<plugin>]'"
)


@check.command
def graph_consistency() -> None:
gms_endpoint, gms_token = get_url_and_token()
check_data_platform.check(gms_endpoint, gms_token)
Empty file.
Loading