Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): clear stateful ingestion correctly #7075

Merged
merged 3 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,23 @@ def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1

if isinstance(wu, MetadataWorkUnit):
urn = wu.get_urn()

# Specialized entity reporting.
if not isinstance(wu.metadata, MetadataChangeEvent):
mcps = [wu.metadata]
else:
mcps = list(mcps_from_mce(wu.metadata))

for mcp in mcps:
urn = mcp.entityUrn
if not urn: # this case is rare
continue

entityType = mcp.entityType
aspectName = mcp.aspectName

if urn not in self._urns_seen:
self._urns_seen.add(urn)
self.entities[entityType].append(urn)

if aspectName: # usually true
if aspectName is not None: # usually true
self.aspects[entityType][aspectName] += 1

def report_warning(self, key: str, reason: str) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.utilities.time import datetime_to_ts_millis

Expand Down Expand Up @@ -462,25 +463,21 @@ def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit]

database_container_key = self.gen_project_id_key(database)

container_workunits = gen_containers(
yield from gen_containers(
container_key=database_container_key,
name=database,
sub_types=["Project"],
domain_urn=domain_urn,
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def gen_dataset_containers(
self, dataset: str, project_id: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(project_id, dataset)

database_container_key = self.gen_project_id_key(database=project_id)

container_workunits = gen_containers(
yield from gen_containers(
schema_container_key,
dataset,
["Dataset"],
Expand All @@ -492,21 +489,15 @@ def gen_dataset_containers(
else None,
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def add_table_to_dataset_container(
self, dataset_urn: str, db_name: str, schema: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(db_name, schema)
container_workunits = add_dataset_to_container(

yield from add_dataset_to_container(
container_key=schema_container_key,
dataset_urn=dataset_urn,
)
for wu in container_workunits:
self.report.report_workunit(wu)
yield wu

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Getting projects")
Expand Down Expand Up @@ -552,13 +543,13 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Starting profiling...")
yield from self.profiler.get_workunits(self.db_tables)

# Clean up stale entities if configured.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
auto_workunit_reporter(
self.report,
auto_status_aspect(self.get_workunits_internal()),
),
)

def _process_project(
Expand All @@ -569,11 +560,7 @@ def _process_project(
self.db_tables[project_id] = {}
self.db_views[project_id] = {}

database_workunits = self.gen_project_id_containers(project_id)

for wu in database_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_project_id_containers(project_id)

try:
bigquery_project.datasets = (
Expand Down Expand Up @@ -713,15 +700,12 @@ def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
) -> Iterable[MetadataWorkUnit]:
dataset_name = bigquery_dataset.name
schema_workunits = self.gen_dataset_containers(

yield from self.gen_dataset_containers(
dataset_name,
project_id,
)

for wu in schema_workunits:
self.report.report_workunit(wu)
yield wu

if self.config.include_tables:
bigquery_dataset.tables = self.get_tables_for_dataset(
conn, project_id, dataset_name
Expand Down Expand Up @@ -760,12 +744,7 @@ def _process_table(
f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
)

table_workunits = self.gen_table_dataset_workunits(
table, project_id, schema_name
)
for wu in table_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_table_dataset_workunits(table, project_id, schema_name)

def _process_view(
self,
Expand All @@ -791,10 +770,7 @@ def _process_view(

self.db_views[project_id][dataset_name].append(view)

view_workunits = self.gen_view_dataset_workunits(view, project_id, dataset_name)
for wu in view_workunits:
self.report.report_workunit(wu)
yield wu
yield from self.gen_view_dataset_workunits(view, project_id, dataset_name)

def _get_domain_wu(
self,
Expand All @@ -803,13 +779,10 @@ def _get_domain_wu(
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
yield from add_domain_to_entity_wu(
entity_urn=entity_urn,
domain_urn=domain_urn,
)
for wu in wus:
self.report.report_workunit(wu)
yield wu

def gen_table_dataset_workunits(
self,
Expand Down Expand Up @@ -882,14 +855,12 @@ def gen_view_dataset_workunits(
view_properties_aspect = ViewProperties(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset",
self.gen_dataset_urn(dataset_name, project_id, table.name),
"viewProperties",
view_properties_aspect,
)
yield wu
self.report.report_workunit(wu)

def gen_dataset_workunits(
self,
Expand All @@ -903,9 +874,7 @@ def gen_dataset_workunits(
dataset_urn = self.gen_dataset_urn(dataset_name, project_id, table.name)

status = Status(removed=False)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "status", status)
yield wu
self.report.report_workunit(wu)
yield wrap_aspect_as_workunit("dataset", dataset_urn, "status", status)

datahub_dataset_name = BigqueryTableIdentifier(
project_id, dataset_name, table.name
Expand Down Expand Up @@ -934,11 +903,9 @@ def gen_dataset_workunits(
if custom_properties:
dataset_properties.customProperties.update(custom_properties)

wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset", dataset_urn, "datasetProperties", dataset_properties
)
yield wu
self.report.report_workunit(wu)

if tags_to_add:
yield self.gen_tags_aspect_workunit(dataset_urn, tags_to_add)
Expand All @@ -950,13 +917,10 @@ def gen_dataset_workunits(
)
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
self.report.report_workunit(dpi_aspect)
yield dpi_aspect

subTypes = SubTypes(typeNames=sub_types)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "subTypes", subTypes)
yield wu
self.report.report_workunit(wu)
yield wrap_aspect_as_workunit("dataset", dataset_urn, "subTypes", subTypes)

yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
Expand All @@ -980,33 +944,27 @@ def gen_lineage(
for upstream in upstream_lineage.upstreams:
patch_builder.add_upstream_lineage(upstream)

lineage_workunits = [
yield from [
MetadataWorkUnit(
id=f"upstreamLineage-for-{dataset_urn}",
mcp_raw=mcp,
)
for mcp in patch_builder.build()
]
else:
lineage_workunits = [
yield from [
wrap_aspect_as_workunit(
"dataset", dataset_urn, "upstreamLineage", upstream_lineage
)
]

for wu in lineage_workunits:
yield wu
self.report.report_workunit(wu)

def gen_tags_aspect_workunit(
self, dataset_urn: str, tags_to_add: List[str]
) -> MetadataWorkUnit:
tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
wu = wrap_aspect_as_workunit("dataset", dataset_urn, "globalTags", tags)
self.report.report_workunit(wu)
return wu
return wrap_aspect_as_workunit("dataset", dataset_urn, "globalTags", tags)

def gen_dataset_urn(self, dataset_name: str, project_id: str, table: str) -> str:
datahub_dataset_name = BigqueryTableIdentifier(project_id, dataset_name, table)
Expand Down Expand Up @@ -1083,11 +1041,9 @@ def gen_schema_metadata(
platformSchema=MySqlDDL(tableSchema=""),
fields=self.gen_schema_fields(table.columns),
)
wu = wrap_aspect_as_workunit(
return wrap_aspect_as_workunit(
"dataset", dataset_urn, "schemaMetadata", schema_metadata
)
self.report.report_workunit(wu)
return wu

def get_report(self) -> BigQueryV2Report:
return self.report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,14 +234,12 @@ def generate_wu_from_profile_requests(
dataset_urn, int(datetime.now().timestamp() * 1000)
)

wu = wrap_aspect_as_workunit(
yield wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
)
self.report.report_workunit(wu)
yield wu

def get_bigquery_profile_request(
self, project: str, dataset: str, table: BigqueryTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ def generate_usage_for_project(
if self.config.usage.include_operational_stats:
operational_wu = self._create_operation_aspect_work_unit(event)
if operational_wu:
self.report.report_workunit(operational_wu)
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
if event.read_event:
Expand Down Expand Up @@ -799,9 +798,7 @@ def get_workunits(
self.report.num_usage_workunits_emitted = 0
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
yield self._make_usage_stat(aggregate)
self.report.num_usage_workunits_emitted += 1

def _make_usage_stat(self, agg: AggregatedDataset) -> MetadataWorkUnit:
Expand Down