Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): switch various sources to auto_stale_entity_removal helper #7158

Merged
merged 3 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@
UpstreamLineageClass,
)
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand All @@ -105,9 +109,7 @@
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueSourceConfig(
AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase
):
class GlueSourceConfig(AwsSourceConfig, StatefulIngestionConfigBase):
jjoyce0510 marked this conversation as resolved.
Show resolved Hide resolved
extract_owners: Optional[bool] = Field(
default=True,
description="When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets.",
Expand Down Expand Up @@ -943,6 +945,12 @@ def _get_domain_wu(
yield wu

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
database_seen = set()
databases, tables = self.get_all_tables_and_databases()

Expand Down Expand Up @@ -989,9 +997,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
dataset_urn=dataset_urn, db_name=database_name
)

# Add table to the checkpoint state.
self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn)

mcp = self.get_lineage_if_enabled(mce)
if mcp:
mcp_wu = MetadataWorkUnit(
Expand All @@ -1013,9 +1018,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
if self.extract_transforms:
yield from self._transform_extraction()

# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
dags: Dict[str, Optional[Dict[str, Any]]] = {}
flow_names: Dict[str, str] = {}
Expand Down
22 changes: 10 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
SubTypesClass,
)
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -195,6 +199,12 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "KafkaSource":
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
topics = self.consumer.list_topics(
timeout=self.source_config.connection.client_timeout_seconds
).topics
Expand All @@ -204,20 +214,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.report_topic_scanned(t)
if self.source_config.topic_patterns.allowed(t):
yield from self._extract_record(t, t_detail, extra_topic_details.get(t))
# add topic to checkpoint
topic_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=t,
platform_instance=self.source_config.platform_instance,
env=self.source_config.env,
)
self.stale_entity_removal_handler.add_entity_to_state(
type="topic", urn=topic_urn
)
else:
self.report.report_dropped(t)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _extract_record(
self,
Expand Down
22 changes: 10 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
DatasetPropertiesClass,
SubTypesClass,
)
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -234,6 +238,12 @@ def create(cls, config_dict, ctx):
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""
Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic
the schema information is retrieved if available.
Expand Down Expand Up @@ -302,24 +312,12 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.report.topics_scanned += 1
if self.config.topic_patterns.allowed(topic):
yield from self._extract_record(topic, is_partitioned)
# Add topic to checkpoint if stateful ingestion is enabled
topic_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=topic,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.stale_entity_removal_handler.add_entity_to_state(
type="topic", urn=topic_urn
)
else:
self.report.report_topics_dropped(topic)
else:
self.report.report_namespaces_dropped(namespace)
else:
self.report.report_tenants_dropped(tenant)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def _is_token_authentication_configured(self) -> bool:
return self.config.token is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,6 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
if sql_config.include_oauth:
yield from self.loop_oauth(inspector, oauth_schema, sql_config)

# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_database_properties(
self, inspector: Inspector, database: str
) -> Optional[Dict[str, str]]:
Expand Down Expand Up @@ -491,8 +488,6 @@ def _process_projections(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("table", dataset_urn)
description, properties, location_urn = self.get_projection_properties(
inspector, schema, projection
)
Expand Down Expand Up @@ -718,8 +713,6 @@ def _process_models(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("model", dataset_urn)
description, properties, location = self.get_model_properties(
inspector, schema, table
)
Expand Down Expand Up @@ -904,8 +897,6 @@ def _process_oauth(
urn=dataset_urn,
aspects=[StatusClass(removed=False)],
)
# Add table to the checkpoint state
self.stale_entity_removal_handler.add_entity_to_state("oauth", dataset_urn)
description, properties, location_urn = self.get_oauth_properties(
inspector, schema, oauth
)
Expand Down