diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 1036e8747b9a4..62310ceba11ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -97,6 +97,10 @@ UpstreamLineageClass, ) from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column +from datahub.utilities.source_helpers import ( + auto_stale_entity_removal, + auto_status_aspect, +) logger = logging.getLogger(__name__) @@ -105,9 +109,7 @@ VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"] -class GlueSourceConfig( - AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase -): +class GlueSourceConfig(AwsSourceConfig, StatefulIngestionConfigBase): extract_owners: Optional[bool] = Field( default=True, description="When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets.", @@ -943,6 +945,12 @@ def _get_domain_wu( yield wu def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_status_aspect(self.get_workunits_internal()), + ) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: database_seen = set() databases, tables = self.get_all_tables_and_databases() @@ -989,9 +997,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: dataset_urn=dataset_urn, db_name=database_name ) - # Add table to the checkpoint state. - self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn) - mcp = self.get_lineage_if_enabled(mce) if mcp: mcp_wu = MetadataWorkUnit( @@ -1013,9 +1018,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: if self.extract_transforms: yield from self._transform_extraction() - # Clean up stale entities. - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() - def _transform_extraction(self) -> Iterable[MetadataWorkUnit]: dags: Dict[str, Optional[Dict[str, Any]]] = {} flow_names: Dict[str, str] = {} diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 13e01e8e03890..f6896136e7c9d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -56,6 +56,10 @@ SubTypesClass, ) from datahub.utilities.registries.domain_registry import DomainRegistry +from datahub.utilities.source_helpers import ( + auto_stale_entity_removal, + auto_status_aspect, +) logger = logging.getLogger(__name__) @@ -195,6 +199,12 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource": return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_status_aspect(self.get_workunits_internal()), + ) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: topics = self.consumer.list_topics( timeout=self.source_config.connection.client_timeout_seconds ).topics @@ -204,20 +214,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.report.report_topic_scanned(t) if self.source_config.topic_patterns.allowed(t): yield from self._extract_record(t, t_detail, extra_topic_details.get(t)) - # add topic to checkpoint - topic_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=t, - platform_instance=self.source_config.platform_instance, - env=self.source_config.env, - ) - self.stale_entity_removal_handler.add_entity_to_state( - type="topic", urn=topic_urn - ) else: self.report.report_dropped(t) - # Clean up stale entities. - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() def _extract_record( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index 044a2fb551741..eccf0c55ba62a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -48,6 +48,10 @@ DatasetPropertiesClass, SubTypesClass, ) +from datahub.utilities.source_helpers import ( + auto_stale_entity_removal, + auto_status_aspect, +) logger = logging.getLogger(__name__) @@ -234,6 +238,12 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_status_aspect(self.get_workunits_internal()), + ) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: """ Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic the schema information is retrieved if available. @@ -302,24 +312,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.report.topics_scanned += 1 if self.config.topic_patterns.allowed(topic): yield from self._extract_record(topic, is_partitioned) - # Add topic to checkpoint if stateful ingestion is enabled - topic_urn = make_dataset_urn_with_platform_instance( - platform=self.platform, - name=topic, - platform_instance=self.config.platform_instance, - env=self.config.env, - ) - self.stale_entity_removal_handler.add_entity_to_state( - type="topic", urn=topic_urn - ) else: self.report.report_topics_dropped(topic) else: self.report.report_namespaces_dropped(namespace) else: self.report.report_tenants_dropped(tenant) - # Clean up stale entities. - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() def _is_token_authentication_configured(self) -> bool: return self.config.token is not None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py index 4c3620900d2e1..ffbb93b1d3f8e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/vertica.py @@ -194,9 +194,6 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: if sql_config.include_oauth: yield from self.loop_oauth(inspector, oauth_schema, sql_config) - # Clean up stale entities. - yield from self.stale_entity_removal_handler.gen_removed_entity_workunits() - def get_database_properties( self, inspector: Inspector, database: str ) -> Optional[Dict[str, str]]: @@ -491,8 +488,6 @@ def _process_projections( urn=dataset_urn, aspects=[StatusClass(removed=False)], ) - # Add table to the checkpoint state - self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn) description, properties, location_urn = self.get_projection_properties( inspector, schema, projection ) @@ -718,8 +713,6 @@ def _process_models( urn=dataset_urn, aspects=[StatusClass(removed=False)], ) - # Add table to the checkpoint state - self.stale_entity_removal_handler.add_entity_to_state("model", dataset_urn) description, properties, location = self.get_model_properties( inspector, schema, table ) @@ -904,8 +897,6 @@ def _process_oauth( urn=dataset_urn, aspects=[StatusClass(removed=False)], ) - # Add table to the checkpoint state - self.stale_entity_removal_handler.add_entity_to_state("oauth", dataset_urn) description, properties, location_urn = self.get_oauth_properties( inspector, schema, oauth )