Skip to content

Commit

Permalink
fix(ingest): switch various sources to auto_stale_entity_removal he…
Browse files Browse the repository at this point in the history
…lper (#7158)
  • Loading branch information
hsheth2 authored Jan 30, 2023
1 parent 727050a commit 7ace79c
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 42 deletions.
20 changes: 11 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@
UpstreamLineageClass,
)
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand All @@ -105,9 +109,7 @@
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueSourceConfig(
AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase
):
class GlueSourceConfig(AwsSourceConfig, StatefulIngestionConfigBase):
extract_owners: Optional[bool] = Field(
default=True,
description="When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets.",
Expand Down Expand Up @@ -943,6 +945,12 @@ def _get_domain_wu(
yield wu

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
database_seen = set()
databases, tables = self.get_all_tables_and_databases()

Expand Down Expand Up @@ -989,9 +997,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
dataset_urn=dataset_urn, db_name=database_name
)

# Add table to the checkpoint state.
self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn)

mcp = self.get_lineage_if_enabled(mce)
if mcp:
mcp_wu = MetadataWorkUnit(
Expand All @@ -1013,9 +1018,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.extract_transforms:
yield from self._transform_extraction()

# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
dags: Dict[str, Optional[Dict[str, Any]]] = {}
flow_names: Dict[str, str] = {}
Expand Down
22 changes: 10 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
SubTypesClass,
)
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -195,6 +199,12 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
topics = self.consumer.list_topics(
timeout=self.source_config.connection.client_timeout_seconds
).topics
Expand All @@ -204,20 +214,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_topic_scanned(t)
if self.source_config.topic_patterns.allowed(t):
yield from self._extract_record(t, t_detail, extra_topic_details.get(t))
# add topic to checkpoint
topic_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=t,
platform_instance=self.source_config.platform_instance,
env=self.source_config.env,
)
self.stale_entity_removal_handler.add_entity_to_state(
type="topic", urn=topic_urn
)
else:
self.report.report_dropped(t)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _extract_record(
self,
Expand Down
22 changes: 10 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
DatasetPropertiesClass,
SubTypesClass,
)
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -234,6 +238,12 @@ def create(cls, config_dict, ctx):
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic
the schema information is retrieved if available.
Expand Down Expand Up @@ -302,24 +312,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.topics_scanned += 1
if self.config.topic_patterns.allowed(topic):
yield from self._extract_record(topic, is_partitioned)
# Add topic to checkpoint if stateful ingestion is enabled
topic_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=topic,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.stale_entity_removal_handler.add_entity_to_state(
type="topic", urn=topic_urn
)
else:
self.report.report_topics_dropped(topic)
else:
self.report.report_namespaces_dropped(namespace)
else:
self.report.report_tenants_dropped(tenant)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _is_token_authentication_configured(self) -> bool:
return self.config.token is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
if sql_config.include_oauth:
yield from self.loop_oauth(inspector, oauth_schema, sql_config)

# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_database_properties(
self, inspector: Inspector, database: str
) -> Optional[Dict[str, str]]:
Expand Down Expand Up @@ -491,8 +488,6 @@ def _process_projections(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn)
description, properties, location_urn = self.get_projection_properties(
inspector, schema, projection
)
Expand Down Expand Up @@ -718,8 +713,6 @@ def _process_models(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("model", dataset_urn)
description, properties, location = self.get_model_properties(
inspector, schema, table
)
Expand Down Expand Up @@ -904,8 +897,6 @@ def _process_oauth(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("oauth", dataset_urn)
description, properties, location_urn = self.get_oauth_properties(
inspector, schema, oauth
)
Expand Down

0 comments on commit 7ace79c

Please sign in to comment.