Skip to content

Commit

Permalink
refactor(ingest): simplify stateful ingestion config (datahub-project…
Browse files Browse the repository at this point in the history
…#6454)

Co-authored-by: Tamas Nemeth <[email protected]>
  • Loading branch information
2 people authored and cccs-Dustin committed Feb 1, 2023
1 parent f5bd975 commit d2abadb
Show file tree
Hide file tree
Showing 40 changed files with 35 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,8 @@ Examples:
3. [BaseSQLAlchemyCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py#L17)

### 2. Modifying the SourceConfig
The `stateful_ingestion` config param that is mandatory for any source using stateful ingestion needs to be overridden with a custom config that is more specific to the source
and is inherited from `datahub.ingestion.source.state.stale_entity_removal_handler.StatefulStaleMetadataRemovalConfig`. The `StatefulStaleMetadataRemovalConfig` adds the following
additional parameters to the basic stateful ingestion config that is common for all sources. Typical customization involves overriding the `_entity_types` private config member which helps produce
more accurate documentation specific to the source.
```python
import pydantic
from typing import List
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionConfig
class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
""" Base specialized config for Stateful Ingestion with stale metadata removal capability. """

# Allows for sources to define(via override) the entity types they support.
_entity_types: List[str] = []
# Whether to enable removal of stale metadata.
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
```
The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`.

Examples:
1. The `KafkaSourceConfig`
Expand All @@ -94,17 +77,11 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import Stateful
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.configuration.source_common import DatasetSourceConfigBase

class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
""" Kafka custom stateful ingestion config definition(overrides _entity_types of StatefulStaleMetadataRemovalConfig). """
_entity_types: List[str] = pydantic.Field(default=["topic"])


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
class KafkaSourceConfig(StatefulIngestionConfigBase):
# ...<other config params>...
""" Override the stateful_ingestion config param with the Kafka custom stateful ingestion config in the KafkaSourceConfig. """
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
```

2. The [DBTStatefulIngestionConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L131)
Expand Down
15 changes: 1 addition & 14 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,6 @@
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the GlueSourceConfig.
"""

_entity_types: List[str] = Field(default=["table"])


class GlueSourceConfig(
AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase
):
Expand Down Expand Up @@ -164,7 +154,7 @@ class GlueSourceConfig(
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[GlueStatefulIngestionConfig] = Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)

Expand Down Expand Up @@ -1294,6 +1284,3 @@ def get_report(self):

def get_platform_instance_id(self) -> str:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,3 @@ def add_config_to_report(self):
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(f"{key} => {reason}")

def close(self) -> None:
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
15 changes: 1 addition & 14 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@
DBT_PLATFORM = "dbt"


class DBTStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of basic StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(default=["assertion", "dataset"])


@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
pass
Expand Down Expand Up @@ -312,7 +302,7 @@ class DBTConfig(StatefulIngestionConfigBase, LineageConfig):
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)
stateful_ingestion: Optional[DBTStatefulIngestionConfig] = pydantic.Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="DBT Stateful Ingestion Config."
)

Expand Down Expand Up @@ -1891,6 +1881,3 @@ def get_platform_instance_id(self) -> str:
raise ValueError("DBT project identifier is not found in manifest")

return f"{self.platform}_{project_id}"

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,3 @@ def get_workunits(self) -> Iterable[WorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,4 @@ def get_report(self):
def close(self):
if self.client:
self.client.close()
super().close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
return
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> FeastSourceReport:
return self.report

def close(self):
return
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def get_report(self):
def close(self):
if self.fp:
self.fp.close()
super().close()

def _iterate_file(self, path: str) -> Iterable[Tuple[int, Any]]:
self.report.current_file_name = path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,6 @@ def get_platform_instance_id(self) -> str:
def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
self.prepare_for_commit()


def _parse_datatype(type: IcebergTypes.Type, nullable: bool = False) -> Dict[str, Any]:
# Check for complex types: struct, list, map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ def ingest_ad_users(
def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
pass

def _get_azure_ad_groups(self) -> Iterable[List]:
yield from self._get_azure_ad_data(kind="/groups")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_report(self):
return self.report

def close(self):
pass

# Instantiates Okta SDK Client.
def _create_okta_client(self):
config = {
Expand Down
15 changes: 2 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@
logger = logging.getLogger(__name__)


class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(default=["topic"])


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# TODO: inline the connection config
Expand All @@ -74,8 +64,7 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = pydantic.Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
Expand Down Expand Up @@ -280,6 +269,6 @@ def get_report(self) -> KafkaSourceReport:
return self.report

def close(self) -> None:
self.prepare_for_commit()
if self.consumer:
self.consumer.close()
super().close()
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ def get_report(self) -> LDAPSourceReport:
return self.report

def close(self) -> None:
"""Closes the Source."""
self.ldap_client.unbind()
super().close()


def parse_from_attrs(attrs: Dict[str, Any], filter_key: str) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,3 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901

def get_report(self):
return self.reporter

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def close(self) -> None:
key="metabase-session",
reason=f"Unable to logout for user {self.config.username}",
)
super().close()

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,3 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,3 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]

def get_report(self):
return self.report

def close(self):
pass
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,4 @@ def get_report(self) -> MongoDBSourceReport:

def close(self):
self.mongo_client.close()
super().close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ def get_workunits(self) -> Iterable[ApiWorkUnit]: # noqa: C901
def get_report(self):
return self.report

def close(self):
pass


class OpenApiSource(APISource):
def __init__(self, config: OpenApiConfig, ctx: PipelineContext):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,3 @@ def get_user_info(self, report: Any) -> OwnershipData:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,5 +552,5 @@ def get_report(self):
return self.report

def close(self):
self.prepare_for_commit()
super().close()
self.session.close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,3 @@ def inspect_session_metadata(self, conn: SnowflakeConnection) -> None:
# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> str:
return self.config.get_account()

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,4 @@ def gen_schema_containers(
def close(self):
if self.cursor:
self.cursor.close()
super().close()
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def _get_presto_view_column_metadata(
def close(self) -> None:
if self._alchemy_client.connection is not None:
self._alchemy_client.connection.close()
self.prepare_for_commit()
super().close()

def get_schema_fields_for_column(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,6 @@ def report_from_query_combiner(
self.query_combiner = query_combiner_report


class SQLAlchemyStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(
default=["assertion", "container", "table", "view"]
)


class SQLAlchemyConfig(StatefulIngestionConfigBase):
options: dict = {}
# Although the 'table_pattern' enables you to skip everything from certain schemas,
Expand Down Expand Up @@ -269,7 +257,7 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase):

profiling: GEProfilingConfig = GEProfilingConfig()
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[SQLAlchemyStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@pydantic.root_validator(pre=True)
def view_pattern_is_table_pattern_unless_specified(
Expand Down Expand Up @@ -1387,6 +1375,3 @@ def prepare_profiler_args(

def get_report(self):
return self.report

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
Base specialized config for Stateful Ingestion with stale metadata removal capability.
"""

_entity_types: List[str] = []
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=20.0,
Expand Down
Loading

0 comments on commit d2abadb

Please sign in to comment.