Skip to content

Commit

Permalink
fix(ingest/bigquery): clear stateful ingestion correctly (#7075)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamas Nemeth <[email protected]>
  • Loading branch information
hsheth2 and treff7es authored Jan 19, 2023
1 parent d6f3feb commit c4f946c
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 79 deletions.
8 changes: 3 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,23 @@ def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1

if isinstance(wu, MetadataWorkUnit):
urn = wu.get_urn()

# Specialized entity reporting.
if not isinstance(wu.metadata, MetadataChangeEvent):
mcps = [wu.metadata]
else:
mcps = list(mcps_from_mce(wu.metadata))

for mcp in mcps:
urn = mcp.entityUrn
if not urn: # this case is rare
continue

entityType = mcp.entityType
aspectName = mcp.aspectName

if urn not in self._urns_seen:
self._urns_seen.add(urn)
self.entities[entityType].append(urn)

if aspectName: # usually true
if aspectName is not None: # usually true
self.aspects[entityType][aspectName] += 1

def report_warning(self, key: str, reason: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.utilities.time import datetime_to_ts_millis

Expand Down Expand Up @@ -462,25 +463,21 @@ def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit]

database_container_key = self.gen_project_id_key(database)

container_workunits = gen_containers(
yield from gen_containers(
container_key=database_container_key,
name=database,
sub_types=["Project"],
domain_urn=domain_urn,
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def gen_dataset_containers(
self, dataset: str, project_id: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(project_id, dataset)

database_container_key = self.gen_project_id_key(database=project_id)

container_workunits = gen_containers(
yield from gen_containers(
schema_container_key,
dataset,
["Dataset"],
Expand All @@ -492,21 +489,15 @@ def gen_dataset_containers(
else None,
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def add_table_to_dataset_container(
self, dataset_urn: str, db_name: str, schema: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(db_name, schema)
container_workunits = add_dataset_to_container(

yield from add_dataset_to_container(
container_key=schema_container_key,
dataset_urn=dataset_urn,
)
for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Getting projects")
Expand Down Expand Up @@ -552,13 +543,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Starting profiling...")
yield from self.profiler.get_workunits(self.db_tables)

# Clean up stale entities if configured.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
auto_workunit_reporter(
self.report,
auto_status_aspect(self.get_workunits_internal()),
),
)

def _process_project(
Expand All @@ -569,11 +560,7 @@ def _process_project(
self.db_tables[project_id] = {}
self.db_views[project_id] = {}

database_workunits = self.gen_project_id_containers(project_id)

for wu in database_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_project_id_containers(project_id)

try:
bigquery_project.datasets = (
Expand Down Expand Up @@ -713,15 +700,12 @@ def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
) -> Iterable[MetadataWorkUnit]:
dataset_name = bigquery_dataset.name
schema_workunits = self.gen_dataset_containers(

yield from self.gen_dataset_containers(
dataset_name,
project_id,
)

for wu in schema_workunits:
self.report.report_workunit(wu)
yield wu

if self.config.include_tables:
bigquery_dataset.tables = self.get_tables_for_dataset(
conn, project_id, dataset_name
Expand Down Expand Up @@ -760,12 +744,7 @@ def _process_table(
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
)

table_workunits = self.gen_table_dataset_workunits(
table, project_id, schema_name
)
for wu in table_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_table_dataset_workunits(table, project_id, schema_name)

def _process_view(
self,
Expand All @@ -791,10 +770,7 @@ def _process_view(

self.db_views[project_id][dataset_name].append(view)

view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_view_dataset_workunits(view, project_id, dataset_name)

def _get_domain_wu(
self,
Expand All @@ -803,13 +779,10 @@ def _get_domain_wu(
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
yield from add_domain_to_entity_wu(
entity_urn=entity_urn,
domain_urn=domain_urn,
)
for wu in wus:
self.report.report_workunit(wu)
yield wu

def gen_table_dataset_workunits(
self,
Expand Down Expand Up @@ -882,14 +855,12 @@ def gen_view_dataset_workunits(
view_properties_aspect = ViewProperties(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset",
self.gen_dataset_urn(dataset_name, project_id, table.name),
"viewProperties",
view_properties_aspect,
)
yield wu
self.report.report_workunit(wu)

def gen_dataset_workunits(
self,
Expand All @@ -903,9 +874,7 @@ def gen_dataset_workunits(
dataset_urn = self.gen_dataset_urn(dataset_name, project_id, table.name)

status = Status(removed=False)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "status", status)
yield wu
self.report.report_workunit(wu)
yield wrap_aspect_as_workunit("dataset", dataset_urn, "status", status)

datahub_dataset_name = BigqueryTableIdentifier(
project_id, dataset_name, table.name
Expand Down Expand Up @@ -934,11 +903,9 @@ def gen_dataset_workunits(
if custom_properties:
dataset_properties.customProperties.update(custom_properties)

wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset", dataset_urn, "datasetProperties", dataset_properties
)
yield wu
self.report.report_workunit(wu)

if tags_to_add:
yield self.gen_tags_aspect_workunit(dataset_urn, tags_to_add)
Expand All @@ -950,13 +917,10 @@ def gen_dataset_workunits(
)
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
self.report.report_workunit(dpi_aspect)
yield dpi_aspect

subTypes = SubTypes(typeNames=sub_types)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "subTypes", subTypes)
yield wu
self.report.report_workunit(wu)
yield wrap_aspect_as_workunit("dataset", dataset_urn, "subTypes", subTypes)

yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
Expand All @@ -980,33 +944,27 @@ def gen_lineage(
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
yield from [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
yield from [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

def gen_tags_aspect_workunit(
self, dataset_urn: str, tags_to_add: List[str]
) -> MetadataWorkUnit:
tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "globalTags", tags)
self.report.report_workunit(wu)
return wu
return wrap_aspect_as_workunit("dataset", dataset_urn, "globalTags", tags)

def gen_dataset_urn(self, dataset_name: str, project_id: str, table: str) -> str:
datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table)
Expand Down Expand Up @@ -1083,11 +1041,9 @@ def gen_schema_metadata(
platformSchema=MySqlDDL(tableSchema=""),
fields=self.gen_schema_fields(table.columns),
)
wu = wrap_aspect_as_workunit(
return wrap_aspect_as_workunit(
"dataset", dataset_urn, "schemaMetadata", schema_metadata
)
self.report.report_workunit(wu)
return wu

def get_report(self) -> BigQueryV2Report:
return self.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,12 @@ def generate_wu_from_profile_requests(
dataset_urn, int(datetime.now().timestamp() * 1000)
)

wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
)
self.report.report_workunit(wu)
yield wu

def get_bigquery_profile_request(
self, project: str, dataset: str, table: BigqueryTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def generate_usage_for_project(
if self.config.usage.include_operational_stats:
operational_wu = self._create_operation_aspect_work_unit(event)
if operational_wu:
self.report.report_workunit(operational_wu)
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
if event.read_event:
Expand Down Expand Up @@ -799,9 +798,7 @@ def get_workunits(
self.report.num_usage_workunits_emitted = 0
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
yield self._make_usage_stat(aggregate)
self.report.num_usage_workunits_emitted += 1

def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
Expand Down

0 comments on commit c4f946c

Please sign in to comment.