Skip to content

Commit

Permalink
feat(run): Create a describe run endpoint for fetching aspects create…
Browse files Browse the repository at this point in the history
…d by the ingestion run (#4964)
  • Loading branch information
Dexter Lee authored May 24, 2022
1 parent bbd0ab8 commit 669160a
Show file tree
Hide file tree
Showing 14 changed files with 327 additions and 165 deletions.
29 changes: 16 additions & 13 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,21 @@ def parse_run_restli_response(response: requests.Response) -> dict:
return summary


def format_aspect_summaries(summaries: list) -> typing.List[typing.List[str]]:
local_timezone = datetime.now().astimezone().tzinfo
return [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in summaries
]


def post_rollback_endpoint(
payload_obj: dict,
path: str,
Expand All @@ -266,19 +281,7 @@ def post_rollback_endpoint(
if len(rows) == 0:
click.secho(f"No entities found. Payload used: {payload}", fg="yellow")

local_timezone = datetime.now().astimezone().tzinfo
structured_rolled_back_results = [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in rolled_back_aspects
]

structured_rolled_back_results = format_aspect_summaries(rolled_back_aspects)
return (
structured_rolled_back_results,
entities_affected,
Expand Down
55 changes: 32 additions & 23 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.cli import cli_utils
from datahub.cli.cli_utils import (
CONDENSED_DATAHUB_CONFIG_PATH,
format_aspect_summaries,
get_session_and_host,
post_rollback_endpoint,
)
Expand Down Expand Up @@ -203,36 +204,44 @@ def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> N

@ingest.command()
@click.option("--run-id", required=True, type=str)
@click.option("--start", type=int, default=0)
@click.option("--count", type=int, default=100)
@click.option(
"--include-soft-deletes",
is_flag=True,
default=False,
help="If enabled, will include aspects that have been soft deleted",
)
@click.option("-a", "--show-aspect", required=False, is_flag=True)
@telemetry.with_telemetry
def show(run_id: str) -> None:
def show(
run_id: str, start: int, count: int, include_soft_deletes: bool, show_aspect: bool
) -> None:
"""Describe a provided ingestion run to datahub"""
session, gms_host = get_session_and_host()

payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
(
structured_rows,
entities_affected,
aspects_modified,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")
url = f"{gms_host}/runs?action=describe"

payload_obj = {
"runId": run_id,
"start": start,
"count": count,
"includeSoft": include_soft_deletes,
"includeAspect": show_aspect,
}

payload = json.dumps(payload_obj)

response = session.post(url, data=payload)

if aspects_modified >= ELASTIC_MAX_PAGE_SIZE:
rows = parse_restli_response(response)
if not show_aspect:
click.echo(
f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects"
tabulate(format_aspect_summaries(rows), RUN_TABLE_COLUMNS, tablefmt="grid")
)
else:
click.echo(
f"this run created {entities_affected} new entities and updated {aspects_modified} aspects"
)
click.echo(
"rolling back will delete the entities created and revert the updated aspects"
)
click.echo()
click.echo(
f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
for row in rows:
click.echo(json.dumps(row, indent=4))


@ingest.command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ public BulkByScrollResponse deleteByUrn(@Nonnull final String urn) {
return null;
}

public BulkByScrollResponse deleteByUrnAspect(
@Nonnull final String urn,
@Nonnull final String aspect
) {
public BulkByScrollResponse deleteByUrnAspect(@Nonnull final String urn, @Nonnull final String aspect) {
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
finalQuery.must(QueryBuilders.termQuery("urn", urn));
finalQuery.must(QueryBuilders.termQuery("aspect", aspect));
Expand All @@ -114,7 +111,7 @@ public BulkByScrollResponse deleteByUrnAspect(
return null;
}

public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted) {
public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted, int from, int size) {
SearchRequest searchRequest = new SearchRequest();

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Expand All @@ -131,8 +128,8 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc

searchSourceBuilder.query(finalQuery);

// this is the max page size elastic will return
searchSourceBuilder.size(10000);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);

searchRequest.source(searchSourceBuilder);

Expand All @@ -147,15 +144,16 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc
return null;
}

public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> params = new HashMap<>();
params.put("registryName", registryName);
params.put("registryVersion", registryVersion);
return findByParams(params, includeSoftDeleted);
return findByParams(params, includeSoftDeleted, from, size);
}

public SearchResponse findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted);
public SearchResponse findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted, from, size);
}

public SearchResponse findRuns(Integer pageOffset, Integer pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.mxe.SystemMetadata;
import java.io.IOException;
Expand Down Expand Up @@ -100,7 +101,8 @@ public void setDocStatus(String urn, boolean removed) {
// searchBy findByParams
// If status.removed -> false (from removed to not removed) --> get soft deleted entities.
// If status.removed -> true (from not removed to removed) --> do not get soft deleted entities.
final List<AspectRowSummary> aspectList = findByParams(ImmutableMap.of("urn", urn), !removed);
final List<AspectRowSummary> aspectList =
findByParams(ImmutableMap.of("urn", urn), !removed, 0, ESUtils.MAX_RESULT_SIZE);
// for each -> toDocId and set removed to true for all
aspectList.forEach(aspect -> {
final String docId = toDocId(aspect.getUrn(), aspect.getAspectName());
Expand All @@ -123,18 +125,19 @@ public void insert(@Nullable SystemMetadata systemMetadata, String urn, String a
}

@Override
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted);
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted);
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted);
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from,
int size) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted, from, size);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();
List<AspectRowSummary> summaries = Arrays.stream(hits.getHits()).map(hit -> {
Expand All @@ -159,11 +162,12 @@ public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams,
}

@Override
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> registryParams = new HashMap<>();
registryParams.put(FIELD_REGISTRY_NAME, registryName);
registryParams.put(FIELD_REGISTRY_VERSION, registryVersion);
return findByParams(registryParams, includeSoftDeleted);
return findByParams(registryParams, includeSoftDeleted, from, size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public interface SystemMetadataService {

void insert(@Nullable SystemMetadata systemMetadata, String urn, String aspect);

List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted);
List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted);
List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted);
List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted);
List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted, int from, int size);

List<IngestionRunSummary> listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import com.linkedin.mxe.SystemMetadata;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testFindByRunId() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 4);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down Expand Up @@ -169,7 +170,7 @@ public void testDelete() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 2);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace com.linkedin.metadata.run

import com.linkedin.entity.Aspect

record AspectRowSummary {
runId: string
aspectName: string
Expand All @@ -8,4 +10,5 @@ record AspectRowSummary {
metadata: string
version: long
keyAspect: boolean
aspect: optional Aspect
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@
},
"supports" : [ ],
"actions" : [ {
"name" : "describe",
"parameters" : [ {
"name" : "runId",
"type" : "string"
}, {
"name" : "start",
"type" : "int"
}, {
"name" : "count",
"type" : "int"
}, {
"name" : "includeSoft",
"type" : "boolean",
"optional" : true
}, {
"name" : "includeAspect",
"type" : "boolean",
"optional" : true
} ],
"returns" : "{ \"type\" : \"array\", \"items\" : \"com.linkedin.metadata.run.AspectRowSummary\" }"
}, {
"name" : "list",
"doc" : "Retrieves the value for an entity that is made up of latest versions of specified aspects.",
"parameters" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,10 @@
"doc" : "Instance of the data platform (e.g. db instance)",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldName" : "platformInstance",
"fieldType" : "URN"
"fieldType" : "URN",
"filterNameOverride" : "Platform Instance"
}
} ],
"Aspect" : {
Expand Down Expand Up @@ -804,6 +806,10 @@
}
},
"doc" : "Urn of the applied tag",
"Relationship" : {
"entityTypes" : [ "tag" ],
"name" : "TaggedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "tags",
Expand Down Expand Up @@ -881,6 +887,10 @@
}
},
"doc" : "Urn of the applied glossary term",
"Relationship" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "TermedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "glossaryTerms",
Expand Down Expand Up @@ -2483,7 +2493,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "FieldTaggedWith"
"name" : "SchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2501,7 +2511,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "FieldWithGlossaryTerm"
"name" : "SchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down Expand Up @@ -2670,7 +2680,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "EditableFieldTaggedWith"
"name" : "EditableSchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2688,7 +2698,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "EditableFieldWithGlossaryTerm"
"name" : "EditableSchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down
Loading

0 comments on commit 669160a

Please sign in to comment.