Skip to content

Commit

Permalink
feat(restore-indices): add endpoint for restore indices, add basic ch…
Browse files Browse the repository at this point in the history
…eck for graph (#5805)
  • Loading branch information
anshbansal authored Sep 2, 2022
1 parent ae4fb7c commit b643b34
Show file tree
Hide file tree
Showing 18 changed files with 463 additions and 221 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
package com.linkedin.datahub.upgrade.restoreindices;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.SystemMetadata;
import io.ebean.EbeanServer;
import io.ebean.ExpressionList;
import io.ebean.PagedList;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,11 +21,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;


public class SendMAEStep implements UpgradeStep {
Expand All @@ -44,131 +34,23 @@ public class SendMAEStep implements UpgradeStep {

private final EbeanServer _server;
private final EntityService _entityService;
private final EntityRegistry _entityRegistry;

public static class KafkaJobResult {
public int ignored = 0;
public int rowsMigrated = 0;
public long timeSqlQueryMs = 0;
public long timeUrnMs = 0;
public long timeEntityRegistryCheckMs = 0;
public long aspectCheckMs = 0;
public long createRecordMs = 0;
public long sendMessageMs = 0;
}

public static void reportStat(UpgradeContext context, String id, long timeMs) {
context.report().addLine(String.format(
"Mins taken for %s so far is %.2f", id, (float) timeMs / 1000 / 60));
}

public class KafkaJob implements Callable<KafkaJobResult> {
public class KafkaJob implements Callable<RestoreIndicesResult> {
UpgradeContext context;
int start;
JobArgs args;
public KafkaJob(UpgradeContext context, int start, JobArgs args) {
RestoreIndicesArgs args;
public KafkaJob(UpgradeContext context, RestoreIndicesArgs args) {
this.context = context;
this.start = start;
this.args = args;
}
@Override
public KafkaJobResult call() {
KafkaJobResult result = new KafkaJobResult();
int ignored = 0;
int rowsMigrated = 0;
context.report().addLine(String.format(
"Reading rows %s through %s from the aspects table started.", start, start + args.batchSize));
long startTime = System.currentTimeMillis();
PagedList<EbeanAspectV2> rows = getPagedAspects(start, args);
result.timeSqlQueryMs = System.currentTimeMillis() - startTime;
context.report().addLine(String.format(
"Reading rows %s through %s from the aspects table completed.", start, start + args.batchSize));

for (EbeanAspectV2 aspect : rows.getList()) {
// 1. Extract an Entity type from the entity Urn
startTime = System.currentTimeMillis();
Urn urn;
try {
urn = Urn.createFromString(aspect.getKey().getUrn());
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to bind Urn with value %s into Urn object: %s. Ignoring row.",
aspect.getKey().getUrn(), e));
ignored = ignored + 1;
continue;
}
result.timeUrnMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

// 2. Verify that the entity associated with the aspect is found in the registry.
final String entityName = urn.getEntityType();
final EntitySpec entitySpec;
try {
entitySpec = _entityRegistry.getEntitySpec(entityName);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to find entity with name %s in Entity Registry: %s. Ignoring row.",
entityName, e));
ignored = ignored + 1;
continue;
}
result.timeEntityRegistryCheckMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
final String aspectName = aspect.getKey().getAspect();

// 3. Verify that the aspect is a valid aspect associated with the entity
AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName);
if (aspectSpec == null) {
context.report()
.addLine(String.format("Failed to find aspect with name %s associated with entity named %s", aspectName,
entityName));
ignored = ignored + 1;
continue;
}
result.aspectCheckMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

// 4. Create record from json aspect
final RecordTemplate aspectRecord;
try {
aspectRecord = EntityUtils.toAspectRecord(entityName, aspectName, aspect.getMetadata(), _entityRegistry);
} catch (Exception e) {
context.report()
.addLine(String.format("Failed to deserialize row %s for entity %s, aspect %s: %s. Ignoring row.",
aspect.getMetadata(), entityName, aspectName, e));
ignored = ignored + 1;
continue;
}
result.createRecordMs += System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();

SystemMetadata latestSystemMetadata = EntityUtils.parseSystemMetadata(aspect.getSystemMetadata());

// 5. Produce MAE events for the aspect record
_entityService.produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, null, aspectRecord, null,
latestSystemMetadata,
new AuditStamp().setActor(UrnUtils.getUrn(SYSTEM_ACTOR)).setTime(System.currentTimeMillis()),
ChangeType.RESTATE);
result.sendMessageMs += System.currentTimeMillis() - startTime;

rowsMigrated++;
}

try {
TimeUnit.MILLISECONDS.sleep(args.batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException("Thread interrupted while sleeping after successful batch migration.");
}
result.ignored = ignored;
result.rowsMigrated = rowsMigrated;
return result;
public RestoreIndicesResult call() {
return _entityService.restoreIndices(args, context.report()::addLine);
}
}

public SendMAEStep(final EbeanServer server, final EntityService entityService, final EntityRegistry entityRegistry) {
_server = server;
_entityService = entityService;
_entityRegistry = entityRegistry;
}

@Override
Expand All @@ -181,73 +63,39 @@ public int retryCount() {
return 0;
}

private List<KafkaJobResult> iterateFutures(List<Future<KafkaJobResult>> futures) {
int beforeSize = futures.size();
int afterSize = futures.size();
List<KafkaJobResult> result = new ArrayList<>();
while (afterSize > 0 && beforeSize == afterSize) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// suppress
}
for (Future<KafkaJobResult> future: new ArrayList<>(futures)) {
if (future.isDone()) {
try {
result.add(future.get());
futures.remove(future);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResult>> futures) {
List<RestoreIndicesResult> result = new ArrayList<>();
for (Future<RestoreIndicesResult> future: new ArrayList<>(futures)) {
if (future.isDone()) {
try {
result.add(future.get());
futures.remove(future);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
afterSize = futures.size();
}
return result;
}

private static class JobArgs {
int batchSize;
int numThreads;
long batchDelayMs;
String aspectName;
String urn;
String urnLike;
}

private JobArgs getArgs(UpgradeContext context) {
JobArgs result = new JobArgs();
private RestoreIndicesArgs getArgs(UpgradeContext context) {
RestoreIndicesArgs result = new RestoreIndicesArgs();
result.batchSize = getBatchSize(context.parsedArgs());
context.report().addLine(String.format("batchSize is %d", result.batchSize));
result.numThreads = getThreadCount(context.parsedArgs());
context.report().addLine(String.format("numThreads is %d", result.numThreads));
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
context.report().addLine(String.format("batchDelayMs is %d", result.batchDelayMs));
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
context.report().addLine(String.format("aspect is %s", result.aspectName));
context.report().addLine(String.format("Found aspectName arg as %s", result.aspectName));
} else {
context.report().addLine("No aspectName arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.URN_ARG_NAME)) {
result.urn = context.parsedArgs().get(RestoreIndices.URN_ARG_NAME).get();
context.report().addLine(String.format("urn is %s", result.urn));
context.report().addLine(String.format("Found urn arg as %s", result.urn));
} else {
context.report().addLine("No urn arg present");
}
if (containsKey(context.parsedArgs(), RestoreIndices.URN_LIKE_ARG_NAME)) {
result.urnLike = context.parsedArgs().get(RestoreIndices.URN_LIKE_ARG_NAME).get();
context.report().addLine(String.format("urnLike is %s", result.urnLike));
context.report().addLine(String.format("Found urn like arg as %s", result.urnLike));
} else {
context.report().addLine("No urnLike arg present");
}
return result;
}

private int getRowCount(JobArgs args) {
private int getRowCount(RestoreIndicesArgs args) {
ExpressionList<EbeanAspectV2> countExp =
_server.find(EbeanAspectV2.class)
.where()
Expand All @@ -267,8 +115,8 @@ private int getRowCount(JobArgs args) {
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
KafkaJobResult finalJobResult = new KafkaJobResult();
JobArgs args = getArgs(context);
RestoreIndicesResult finalJobResult = new RestoreIndicesResult();
RestoreIndicesArgs args = getArgs(context);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(args.numThreads);

context.report().addLine("Sending MAE from local DB");
Expand All @@ -278,21 +126,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
rowCount, (float) (System.currentTimeMillis() - startTime) / 1000 / 60));
int start = 0;

List<Future<KafkaJobResult>> futures = new ArrayList<>();
List<Future<RestoreIndicesResult>> futures = new ArrayList<>();
startTime = System.currentTimeMillis();
while (start < rowCount) {
while (futures.size() < args.numThreads && start < rowCount) {
futures.add(executor.submit(new KafkaJob(context, start, args)));
start = start + args.batchSize;
}
List<KafkaJobResult> tmpResults = iterateFutures(futures);
for (KafkaJobResult tmpResult: tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
}
args = args.clone();
args.start = start;
futures.add(executor.submit(new KafkaJob(context, args)));
start = start + args.batchSize;
}
while (futures.size() > 0) {
List<KafkaJobResult> tmpResults = iterateFutures(futures);
for (KafkaJobResult tmpResult: tmpResults) {
List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
for (RestoreIndicesResult tmpResult: tmpResults) {
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
}
}
Expand All @@ -310,22 +154,17 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
};
}

private static void reportStats(UpgradeContext context, KafkaJobResult finalResult, KafkaJobResult tmpResult,
private static void reportStats(UpgradeContext context, RestoreIndicesResult finalResult, RestoreIndicesResult tmpResult,
int rowCount, long startTime) {
finalResult.ignored += tmpResult.ignored;
finalResult.rowsMigrated += tmpResult.rowsMigrated;
finalResult.timeSqlQueryMs += tmpResult.timeSqlQueryMs;
reportStat(context, "sql query", finalResult.timeSqlQueryMs);
finalResult.timeUrnMs += tmpResult.timeUrnMs;
reportStat(context, "timeUrnMs", finalResult.timeUrnMs);
finalResult.timeEntityRegistryCheckMs += tmpResult.timeEntityRegistryCheckMs;
reportStat(context, "timeEntityRegistryCheckMs", finalResult.timeEntityRegistryCheckMs);
finalResult.aspectCheckMs += tmpResult.aspectCheckMs;
reportStat(context, "aspectCheckMs", finalResult.aspectCheckMs);
finalResult.createRecordMs += tmpResult.createRecordMs;
reportStat(context, "createRecordMs", finalResult.createRecordMs);
finalResult.sendMessageMs += tmpResult.sendMessageMs;
reportStat(context, "sendMessageMs", finalResult.sendMessageMs);
context.report().addLine(String.format("metrics so far %s", finalResult));

long currentTime = System.currentTimeMillis();
float timeSoFarMinutes = (float) (currentTime - startTime) / 1000 / 60;
Expand All @@ -343,29 +182,6 @@ private static void reportStats(UpgradeContext context, KafkaJobResult finalResu
timeSoFarMinutes, estimatedTimeMinutesComplete, totalTimeComplete));
}

private PagedList<EbeanAspectV2> getPagedAspects(final int start, final JobArgs args) {
ExpressionList<EbeanAspectV2> exp = _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION);
if (args.aspectName != null) {
exp = exp.eq(EbeanAspectV2.ASPECT_COLUMN, args.aspectName);
}
if (args.urn != null) {
exp = exp.eq(EbeanAspectV2.URN_COLUMN, args.urn);
}
if (args.urnLike != null) {
exp = exp.like(EbeanAspectV2.URN_COLUMN, args.urnLike);
}
return exp.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
.asc(EbeanAspectV2.ASPECT_COLUMN)
.setFirstRow(start)
.setMaxRows(args.batchSize)
.findPagedList();
}

private int getBatchSize(final Map<String, Optional<String>> parsedArgs) {
return getInt(parsedArgs, DEFAULT_BATCH_SIZE, RestoreIndices.BATCH_SIZE_ARG_NAME);
}
Expand Down
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import click

from datahub import __package_name__
from datahub.cli.cli_utils import get_url_and_token
from datahub.cli.json_file import check_mce_file
from datahub.graph_consistency import check_data_platform
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
Expand Down Expand Up @@ -81,3 +83,9 @@ def plugins(verbose: bool) -> None:
click.echo(
f"If a plugin is disabled, try running: pip install '{__package_name__}[<plugin>]'"
)


@check.command
def graph_consistency() -> None:
gms_endpoint, gms_token = get_url_and_token()
check_data_platform.check(gms_endpoint, gms_token)
Empty file.
Loading

0 comments on commit b643b34

Please sign in to comment.