Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): simplify stateful ingestion config #6454

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,8 @@ Examples:
3. [BaseSQLAlchemyCheckpointState](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/sql_common_state.py#L17)

### 2. Modifying the SourceConfig
The `stateful_ingestion` config param that is mandatory for any source using stateful ingestion needs to be overridden with a custom config that is more specific to the source
and is inherited from `datahub.ingestion.source.state.stale_entity_removal_handler.StatefulStaleMetadataRemovalConfig`. The `StatefulStaleMetadataRemovalConfig` adds the following
additional parameters to the basic stateful ingestion config that is common for all sources. Typical customization involves overriding the `_entity_types` private config member which helps produce
more accurate documentation specific to the source.
```python
import pydantic
from typing import List
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionConfig
class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
""" Base specialized config for Stateful Ingestion with stale metadata removal capability. """

# Allows for sources to define(via override) the entity types they support.
_entity_types: List[str] = []
# Whether to enable removal of stale metadata.
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
```
The source's config must inherit from `StatefulIngestionConfigBase`, and should declare a field named `stateful_ingestion` of type `Optional[StatefulStaleMetadataRemovalConfig]`.

Examples:
1. The `KafkaSourceConfig`
Expand All @@ -94,17 +77,11 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import Stateful
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.configuration.source_common import DatasetSourceConfigBase

class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
""" Kafka custom stateful ingestion config definition(overrides _entity_types of StatefulStaleMetadataRemovalConfig). """
_entity_types: List[str] = pydantic.Field(default=["topic"])


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
class KafkaSourceConfig(StatefulIngestionConfigBase):
# ...<other config params>...
""" Override the stateful_ingestion config param with the Kafka custom stateful ingestion config in the KafkaSourceConfig. """
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None

stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
```

2. The [DBTStatefulIngestionConfig](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/dbt.py#L131)
Expand Down
15 changes: 1 addition & 14 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,6 @@
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]


class GlueStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the GlueSourceConfig.
"""

_entity_types: List[str] = Field(default=["table"])


class GlueSourceConfig(
AwsSourceConfig, GlueProfilingConfig, StatefulIngestionConfigBase
):
Expand Down Expand Up @@ -164,7 +154,7 @@ class GlueSourceConfig(
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[GlueStatefulIngestionConfig] = Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)

Expand Down Expand Up @@ -1294,6 +1284,3 @@ def get_report(self):

def get_platform_instance_id(self) -> str:
return self.source_config.platform_instance or self.platform

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,3 @@ def add_config_to_report(self):
def warn(self, log: logging.Logger, key: str, reason: str) -> None:
self.report.report_warning(key, reason)
log.warning(f"{key} => {reason}")

def close(self) -> None:
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
15 changes: 1 addition & 14 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@
DBT_PLATFORM = "dbt"


class DBTStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of basic StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(default=["assertion", "dataset"])


@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
pass
Expand Down Expand Up @@ -312,7 +302,7 @@ class DBTConfig(StatefulIngestionConfigBase, LineageConfig):
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)
stateful_ingestion: Optional[DBTStatefulIngestionConfig] = pydantic.Field(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
default=None, description="DBT Stateful Ingestion Config."
)

Expand Down Expand Up @@ -1891,6 +1881,3 @@ def get_platform_instance_id(self) -> str:
raise ValueError("DBT project identifier is not found in manifest")

return f"{self.platform}_{project_id}"

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,3 @@ def get_workunits(self) -> Iterable[WorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -482,3 +482,4 @@ def get_report(self):
def close(self):
if self.client:
self.client.close()
super().close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
return
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> FeastSourceReport:
return self.report

def close(self):
return
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def get_report(self):
def close(self):
if self.fp:
self.fp.close()
super().close()

def _iterate_file(self, path: str) -> Iterable[Tuple[int, Any]]:
self.report.current_file_name = path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,6 @@ def _get_avro_schema_from_data_type(self, column: NestedField) -> Dict[str, Any]
def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
pass


def _parse_datatype(type: IcebergTypes.Type, nullable: bool = False) -> Dict[str, Any]:
# Check for complex types: struct, list, map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ def ingest_ad_users(
def get_report(self) -> SourceReport:
return self.report

def close(self) -> None:
pass

def _get_azure_ad_groups(self) -> Iterable[List]:
yield from self._get_azure_ad_data(kind="/groups")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_report(self):
return self.report

def close(self):
pass

# Instantiates Okta SDK Client.
def _create_okta_client(self):
config = {
Expand Down
15 changes: 2 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@
logger = logging.getLogger(__name__)


class KafkaSourceStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(default=["topic"])


class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
env: str = DEFAULT_ENV
# TODO: inline the connection config
Expand All @@ -74,8 +64,7 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
default={},
description="Provides the mapping for the `key` and the `value` schemas of a topic to the corresponding schema registry subject name. Each entry of this map has the form `<topic_name>-key`:`<schema_registry_subject_name_for_key_schema>` and `<topic_name>-value`:`<schema_registry_subject_name_for_value_schema>` for the key and the value schemas associated with the topic, respectively. This parameter is mandatory when the [RecordNameStrategy](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#how-the-naming-strategies-work) is used as the subject naming strategy in the kafka schema registry. NOTE: When provided, this overrides the default subject name resolution even when the `TopicNameStrategy` or the `TopicRecordNameStrategy` are used.",
)
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[KafkaSourceStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None
schema_registry_class: str = pydantic.Field(
default="datahub.ingestion.source.confluent_schema_registry.ConfluentSchemaRegistry",
description="The fully qualified implementation class(custom) that implements the KafkaSchemaRegistryBase interface.",
Expand Down Expand Up @@ -280,6 +269,6 @@ def get_report(self) -> KafkaSourceReport:
return self.report

def close(self) -> None:
self.prepare_for_commit()
if self.consumer:
self.consumer.close()
super().close()
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ def get_report(self) -> LDAPSourceReport:
return self.report

def close(self) -> None:
"""Closes the Source."""
self.ldap_client.unbind()
super().close()


def parse_from_attrs(attrs: Dict[str, Any], filter_key: str) -> List[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1690,6 +1690,3 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901

def get_report(self):
return self.reporter

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def close(self) -> None:
key="metabase-session",
reason=f"Unable to logout for user {self.config.username}",
)
super().close()

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,3 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,3 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, UsageStatsWorkUnit]]

def get_report(self):
return self.report

def close(self):
pass
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,4 @@ def get_report(self) -> MongoDBSourceReport:

def close(self):
self.mongo_client.close()
super().close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ def get_workunits(self) -> Iterable[ApiWorkUnit]: # noqa: C901
def get_report(self):
return self.report

def close(self):
pass


class OpenApiSource(APISource):
def __init__(self, config: OpenApiConfig, ctx: PipelineContext):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,3 @@ def get_user_info(self, report: Any) -> OwnershipData:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -552,5 +552,5 @@ def get_report(self):
return self.report

def close(self):
self.prepare_for_commit()
super().close()
self.session.close()
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self) -> SourceReport:
return self.report

def close(self):
pass
3 changes: 0 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,3 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report

def close(self):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,3 @@ def inspect_session_metadata(self, conn: SnowflakeConnection) -> None:
# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> str:
return self.config.get_account()

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,4 @@ def gen_schema_containers(
def close(self):
if self.cursor:
self.cursor.close()
super().close()
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ def _get_presto_view_column_metadata(
def close(self) -> None:
if self._alchemy_client.connection is not None:
self._alchemy_client.connection.close()
self.prepare_for_commit()
super().close()

def get_schema_fields_for_column(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,18 +221,6 @@ def report_from_query_combiner(
self.query_combiner = query_combiner_report


class SQLAlchemyStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig.
"""

_entity_types: List[str] = pydantic.Field(
default=["assertion", "container", "table", "view"]
)


class SQLAlchemyConfig(StatefulIngestionConfigBase):
options: dict = {}
# Although the 'table_pattern' enables you to skip everything from certain schemas,
Expand Down Expand Up @@ -269,7 +257,7 @@ class SQLAlchemyConfig(StatefulIngestionConfigBase):

profiling: GEProfilingConfig = GEProfilingConfig()
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[SQLAlchemyStatefulIngestionConfig] = None
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@pydantic.root_validator(pre=True)
def view_pattern_is_table_pattern_unless_specified(
Expand Down Expand Up @@ -1387,6 +1375,3 @@ def prepare_profiler_args(

def get_report(self):
return self.report

def close(self):
self.prepare_for_commit()
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
Base specialized config for Stateful Ingestion with stale metadata removal capability.
"""

_entity_types: List[str] = []
remove_stale_metadata: bool = pydantic.Field(
default=True,
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
description="Soft-deletes the entities present in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=20.0,
Expand Down
Loading