diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 6e2b2af350eab5..0b7f2a2c2d66ff 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -62,6 +62,10 @@ from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor +from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler +from datahub.ingestion.source.state.redundant_run_skip_handler import ( + RedundantRunSkipHandler, +) from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, ) @@ -112,6 +116,7 @@ auto_stale_entity_removal, auto_status_aspect, ) +from datahub.utilities.time import datetime_to_ts_millis logger: logging.Logger = logging.getLogger(__name__) @@ -206,7 +211,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): # For database, schema, tables, views, etc self.lineage_extractor = BigqueryLineageExtractor(config, self.report) self.usage_extractor = BigQueryUsageExtractor(config, self.report) - self.profiler = BigqueryProfiler(config, self.report) # Currently caching using instance variables # TODO - rewrite cache for readability or use out of the box solution @@ -231,6 +235,25 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config): cached_domains=[k for k in self.config.domain], graph=self.ctx.graph ) + self.redundant_run_skip_handler = RedundantRunSkipHandler( + source=self, + config=self.config, + pipeline_name=self.ctx.pipeline_name, + run_id=self.ctx.run_id, + ) + + self.profiling_state_handler: Optional[ProfilingHandler] = None + if self.config.store_last_profiling_timestamps: + self.profiling_state_handler = ProfilingHandler( + source=self, + config=self.config, + pipeline_name=self.ctx.pipeline_name, + run_id=self.ctx.run_id, + ) + self.profiler = BigqueryProfiler( + config, self.report, self.profiling_state_handler + ) + atexit.register(cleanup, config) @classmethod @@ -602,9 +625,48 @@ def _process_project( continue if self.config.include_table_lineage: + if ( + self.config.store_last_lineage_extraction_timestamp + and self.redundant_run_skip_handler.should_skip_this_run( + cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) + ) + ): + # Skip this run + self.report.report_warning( + "lineage-extraction", + f"Skip this run as there was a run later than the current start time: {self.config.start_time}", + ) + return + + if self.config.store_last_lineage_extraction_timestamp: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + start_time_millis=datetime_to_ts_millis(self.config.start_time), + end_time_millis=datetime_to_ts_millis(self.config.end_time), + ) + yield from self.generate_lineage(project_id) if self.config.include_usage_statistics: + if ( + self.config.store_last_usage_extraction_timestamp + and self.redundant_run_skip_handler.should_skip_this_run( + cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) + ) + ): + self.report.report_warning( + "usage-extraction", + f"Skip this run as there was a run later than the current start time: {self.config.start_time}", + ) + return + + if self.config.store_last_usage_extraction_timestamp: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + start_time_millis=datetime_to_ts_millis(self.config.start_time), + end_time_millis=datetime_to_ts_millis(self.config.end_time), + ) + yield from self.generate_usage_statistics(project_id) def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 6319a19bb895b2..92451761bb844f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -4,7 +4,12 @@ from pydantic import Field, PositiveInt, root_validator -from datahub.configuration.common import AllowDenyPattern, LineageConfig +from datahub.configuration.common import AllowDenyPattern +from datahub.ingestion.source.state.stateful_ingestion_base import ( + LineageStatefulIngestionConfig, + ProfilingStatefulIngestionConfig, + UsageStatefulIngestionConfig, +) from datahub.ingestion.source.usage.usage_common import BaseUsageConfig from datahub.ingestion.source_config.sql.bigquery import BigQueryConfig @@ -23,7 +28,12 @@ class BigQueryUsageConfig(BaseUsageConfig): ) -class BigQueryV2Config(BigQueryConfig, LineageConfig): +class BigQueryV2Config( + BigQueryConfig, + LineageStatefulIngestionConfig, + UsageStatefulIngestionConfig, + ProfilingStatefulIngestionConfig, +): project_id_pattern: AllowDenyPattern = Field( default=AllowDenyPattern.allow_all(), description="Regex patterns for project_id to filter in ingestion.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py index f0b66c49e2cc09..ea756fbb8c646b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py @@ -20,6 +20,7 @@ GenericProfiler, TableProfilerRequest, ) +from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler logger = logging.getLogger(__name__) @@ -34,8 +35,13 @@ class BigqueryProfiler(GenericProfiler): config: BigQueryV2Config report: BigQueryV2Report - def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None: - super().__init__(config, report, "bigquery") + def __init__( + self, + config: BigQueryV2Config, + report: BigQueryV2Report, + state_handler: Optional[ProfilingHandler] = None, + ) -> None: + super().__init__(config, report, "bigquery", state_handler) self.config = config self.report = report @@ -164,6 +170,9 @@ def get_workunits( continue for table in tables[project][dataset]: + normalized_table_name = BigqueryTableIdentifier( + project_id=project, dataset=dataset, table=table.name + ).get_table_name() for column in table.columns: # Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling # We also filter columns without data type as it means that column is part of a complex type. @@ -171,10 +180,6 @@ def get_workunits( word in column.data_type.lower() for word in ["array", "struct", "geography", "json"] ): - normalized_table_name = BigqueryTableIdentifier( - project_id=project, dataset=dataset, table=table.name - ).get_table_name() - self.config.profile_pattern.deny.append( f"^{normalized_table_name}.{column.field_path}$" ) @@ -188,40 +193,51 @@ def get_workunits( if len(profile_requests) == 0: continue - table_profile_requests = cast(List[TableProfilerRequest], profile_requests) - for request, profile in self.generate_profiles( - table_profile_requests, - self.config.profiling.max_workers, - platform=self.platform, - profiler_args=self.get_profile_args(), - ): - if request is None or profile is None: - continue + yield from self.generate_wu_from_profile_requests(profile_requests) + + def generate_wu_from_profile_requests( + self, profile_requests: List[BigqueryProfilerRequest] + ) -> Iterable[MetadataWorkUnit]: + table_profile_requests = cast(List[TableProfilerRequest], profile_requests) + for request, profile in self.generate_profiles( + table_profile_requests, + self.config.profiling.max_workers, + platform=self.platform, + profiler_args=self.get_profile_args(), + ): + if request is None or profile is None: + continue - request = cast(BigqueryProfilerRequest, request) - profile.sizeInBytes = request.table.size_in_bytes - # If table is partitioned we profile only one partition (if nothing set then the last one) - # but for table level we can use the rows_count from the table metadata - # This way even though column statistics only reflects one partition data but the rows count - # shows the proper count. - if profile.partitionSpec and profile.partitionSpec.partition: - profile.rowCount = request.table.rows_count + request = cast(BigqueryProfilerRequest, request) + profile.sizeInBytes = request.table.size_in_bytes + # If table is partitioned we profile only one partition (if nothing set then the last one) + # but for table level we can use the rows_count from the table metadata + # This way even though column statistics only reflects one partition data but the rows count + # shows the proper count. + if profile.partitionSpec and profile.partitionSpec.partition: + profile.rowCount = request.table.rows_count - dataset_name = request.pretty_name - dataset_urn = make_dataset_urn_with_platform_instance( - self.platform, - dataset_name, - self.config.platform_instance, - self.config.env, - ) - wu = wrap_aspect_as_workunit( - "dataset", - dataset_urn, - "datasetProfile", - profile, + dataset_name = request.pretty_name + dataset_urn = make_dataset_urn_with_platform_instance( + self.platform, + dataset_name, + self.config.platform_instance, + self.config.env, + ) + # We don't add to the profiler state if we only do table level profiling as it always happens + if self.state_handler and not request.profile_table_level_only: + self.state_handler.add_to_state( + dataset_urn, int(datetime.datetime.utcnow().timestamp() * 1000) ) - self.report.report_workunit(wu) - yield wu + + wu = wrap_aspect_as_workunit( + "dataset", + dataset_urn, + "datasetProfile", + profile, + ) + self.report.report_workunit(wu) + yield wu def get_bigquery_profile_request( self, project: str, dataset: str, table: BigqueryTable diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 36d89e3fb7e573..8b2c722254b812 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -5,6 +5,10 @@ from datahub.configuration.common import AllowDenyPattern from datahub.ingestion.glossary.classifier import ClassificationConfig +from datahub.ingestion.source.state.stateful_ingestion_base import ( + ProfilingStatefulIngestionConfig, + UsageStatefulIngestionConfig, +) from datahub.ingestion.source_config.sql.snowflake import ( BaseSnowflakeConfig, SnowflakeConfig, @@ -15,7 +19,12 @@ logger = logging.Logger(__name__) -class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig): +class SnowflakeV2Config( + SnowflakeConfig, + SnowflakeUsageConfig, + UsageStatefulIngestionConfig, + ProfilingStatefulIngestionConfig, +): convert_urns_to_lowercase: bool = Field( default=True, ) @@ -54,7 +63,7 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig): description="Whether to populate Snowsight url for Snowflake Objects", ) - match_fully_qualified_names = bool = Field( + match_fully_qualified_names: bool = Field( default=False, description="Whether `schema_pattern` is matched against fully qualified schema name `.`.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py index 7886c22f061f47..cbc0e009d00b6e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_profiler.py @@ -1,5 +1,6 @@ import dataclasses import logging +from datetime import datetime from typing import Callable, Iterable, List, Optional, cast from snowflake.sqlalchemy import snowdialect @@ -28,6 +29,7 @@ GenericProfiler, TableProfilerRequest, ) +from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType @@ -44,8 +46,13 @@ class SnowflakeProfiler(SnowflakeCommonMixin, GenericProfiler, SnowflakeCommonPr config: SnowflakeV2Config report: SnowflakeV2Report - def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None: - super().__init__(config, report, self.platform) + def __init__( + self, + config: SnowflakeV2Config, + report: SnowflakeV2Report, + state_handler: Optional[ProfilingHandler] = None, + ) -> None: + super().__init__(config, report, self.platform, state_handler) self.config = config self.report = report self.logger = logger @@ -102,6 +109,13 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit self.config.platform_instance, self.config.env, ) + + # We don't add to the profiler state if we only do table level profiling as it always happens + if self.state_handler: + self.state_handler.add_to_state( + dataset_urn, int(datetime.utcnow().timestamp() * 1000) + ) + yield self.wrap_aspect_as_workunit( "dataset", dataset_urn, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 2f1c7d9b3f2180..7ed8456415af03 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -68,6 +68,7 @@ SnowflakeQueryMixin, ) from datahub.ingestion.source.sql.sql_common import SqlContainerSubTypes +from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.ingestion.source.state.redundant_run_skip_handler import ( RedundantRunSkipHandler, ) @@ -225,9 +226,20 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config): # For usage stats self.usage_extractor = SnowflakeUsageExtractor(config, self.report) + self.profiling_state_handler: Optional[ProfilingHandler] = None + if self.config.store_last_profiling_timestamps: + self.profiling_state_handler = ProfilingHandler( + source=self, + config=self.config, + pipeline_name=self.ctx.pipeline_name, + run_id=self.ctx.run_id, + ) + if config.profiling.enabled: # For profiling - self.profiler = SnowflakeProfiler(config, self.report) + self.profiler = SnowflakeProfiler( + config, self.report, self.profiling_state_handler + ) if self.is_classification_enabled(): self.classifiers = self.get_classifiers() @@ -455,17 +467,25 @@ def get_workunits(self) -> Iterable[WorkUnit]: yield from self.profiler.get_workunits(databases) if self.config.include_usage_stats or self.config.include_operational_stats: - if self.redundant_run_skip_handler.should_skip_this_run( - cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) + if ( + self.config.store_last_usage_extraction_timestamp + and self.redundant_run_skip_handler.should_skip_this_run( + cur_start_time_millis=datetime_to_ts_millis(self.config.start_time) + ) ): # Skip this run + self.report.report_warning( + "usage-extraction", + f"Skip this run as there was a run later than the current start time: {self.config.start_time}", + ) return - # Update the checkpoint state for this run. - self.redundant_run_skip_handler.update_state( - start_time_millis=datetime_to_ts_millis(self.config.start_time), - end_time_millis=datetime_to_ts_millis(self.config.end_time), - ) + if self.config.store_last_usage_extraction_timestamp: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + start_time_millis=datetime_to_ts_millis(self.config.start_time), + end_time_millis=datetime_to_ts_millis(self.config.end_time), + ) discovered_datasets: List[str] = [ self.get_dataset_identifier(table.name, schema.name, db.name) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py index 81c96ab70b8368..782e71ae9a1ae0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_generic_profiler.py @@ -6,12 +6,14 @@ from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.ingestion.source.ge_data_profiler import ( DatahubGEProfiler, GEProfilerRequest, ) from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig, SQLSourceReport from datahub.ingestion.source.sql.sql_generic import BaseTable, BaseView +from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProfile from datahub.metadata.schema_classes import DatasetProfileClass from datahub.utilities.stats_collections import TopKDict @@ -41,11 +43,16 @@ class TableProfilerRequest(GEProfilerRequest): class GenericProfiler: def __init__( - self, config: SQLAlchemyConfig, report: ProfilingSqlReport, platform: str + self, + config: SQLAlchemyConfig, + report: ProfilingSqlReport, + platform: str, + state_handler: Optional[ProfilingHandler] = None, ) -> None: self.config = config self.report = report self.platform = platform + self.state_handler = state_handler def generate_profiles( self, @@ -118,15 +125,34 @@ def is_dataset_eligible_for_profiling( size_in_bytes: Optional[int], rows_count: Optional[int], ) -> bool: - threshold_time: Optional[datetime] = None + dataset_urn = make_dataset_urn_with_platform_instance( + self.platform, + dataset_name, + self.config.platform_instance, + self.config.env, + ) + + if not self.config.table_pattern.allowed(dataset_name): + return False + + last_profiled: Optional[int] = None + if self.state_handler: + last_profiled = self.state_handler.get_last_profiled(dataset_urn) + if last_profiled: + # If profiling state exists we have to carry over to the new state + self.state_handler.add_to_state(dataset_urn, last_profiled) + + threshold_time: Optional[datetime] = ( + datetime.fromtimestamp(last_profiled / 1000, timezone.utc) + if last_profiled + else None + ) if self.config.profiling.profile_if_updated_since_days is not None: threshold_time = datetime.now(timezone.utc) - timedelta( self.config.profiling.profile_if_updated_since_days ) - if not self.config.table_pattern.allowed( - dataset_name - ) or not self.config.profile_pattern.allowed(dataset_name): + if not self.config.profile_pattern.allowed(dataset_name): return False schema_name = dataset_name.rsplit(".", 1)[0] diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state.py b/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state.py new file mode 100644 index 00000000000000..9eef2c61002827 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state.py @@ -0,0 +1,16 @@ +from typing import Dict + +import pydantic + +from datahub.ingestion.source.state.checkpoint import CheckpointStateBase + + +class ProfilingCheckpointState(CheckpointStateBase): + """ + Base class for representing the checkpoint state for all profiling based sources. + Stores the last successful profiling time per urn. + Subclasses can define additional state as appropriate. + """ + + # Last profiled stores urn, last_profiled timestamp millis in a dict + last_profiled: Dict[str, pydantic.PositiveInt] diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state_handler.py new file mode 100644 index 00000000000000..853abe51dfd275 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/state/profiling_state_handler.py @@ -0,0 +1,124 @@ +import logging +from collections import defaultdict +from typing import Optional, cast + +import pydantic + +from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.profiling_state import ProfilingCheckpointState +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StatefulIngestionConfigBase, + StatefulIngestionSourceBase, +) +from datahub.ingestion.source.state.use_case_handler import ( + StatefulIngestionUsecaseHandlerBase, +) + +logger: logging.Logger = logging.getLogger(__name__) + + +class ProfilingStatefulIngestionConfig(StatefulIngestionConfig): + """ + Base specialized config of Stateful Profiling. + """ + + +class ProfilingHandler(StatefulIngestionUsecaseHandlerBase[ProfilingCheckpointState]): + """ + The stateful ingestion helper class that handles skipping redundant runs. + This contains the generic logic for all sources that need to support skipping redundant runs. + """ + + INVALID_TIMESTAMP_VALUE: pydantic.PositiveInt = 1 + + def __init__( + self, + source: StatefulIngestionSourceBase, + config: StatefulIngestionConfigBase[ProfilingStatefulIngestionConfig], + pipeline_name: Optional[str], + run_id: str, + ): + self.source = source + self.stateful_ingestion_config: Optional[ + ProfilingStatefulIngestionConfig + ] = config.stateful_ingestion + self.pipeline_name = pipeline_name + self.run_id = run_id + self.checkpointing_enabled: bool = source.is_stateful_ingestion_configured() + self._job_id = self._init_job_id() + self.source.register_stateful_ingestion_usecase_handler(self) + + def _ignore_old_state(self) -> bool: + if ( + self.stateful_ingestion_config is not None + and self.stateful_ingestion_config.ignore_old_state + ): + return True + return False + + def _ignore_new_state(self) -> bool: + if ( + self.stateful_ingestion_config is not None + and self.stateful_ingestion_config.ignore_new_state + ): + return True + return False + + def _init_job_id(self) -> JobId: + return JobId("profiling") + + @property + def job_id(self) -> JobId: + return self._job_id + + def is_checkpointing_enabled(self) -> bool: + return self.checkpointing_enabled + + def create_checkpoint(self) -> Optional[Checkpoint[ProfilingCheckpointState]]: + if not self.is_checkpointing_enabled() or self._ignore_new_state(): + return None + + assert self.pipeline_name is not None + return Checkpoint( + job_name=self.job_id, + pipeline_name=self.pipeline_name, + run_id=self.run_id, + state=ProfilingCheckpointState(last_profiled=defaultdict()), + ) + + def get_current_state(self) -> Optional[ProfilingCheckpointState]: + if not self.is_checkpointing_enabled() or self._ignore_new_state(): + return None + cur_checkpoint = self.source.get_current_checkpoint(self.job_id) + assert cur_checkpoint is not None + cur_state = cast(ProfilingCheckpointState, cur_checkpoint.state) + return cur_state + + def add_to_state( + self, + urn: str, + profile_time_millis: pydantic.PositiveInt, + ) -> None: + cur_state = self.get_current_state() + if cur_state: + cur_state.last_profiled[urn] = profile_time_millis + + def get_last_state(self) -> Optional[ProfilingCheckpointState]: + if not self.is_checkpointing_enabled() or self._ignore_old_state(): + return None + last_checkpoint = self.source.get_last_checkpoint( + self.job_id, ProfilingCheckpointState + ) + if last_checkpoint and last_checkpoint.state: + return cast(ProfilingCheckpointState, last_checkpoint.state) + + return None + + def get_last_profiled(self, urn: str) -> Optional[pydantic.PositiveInt]: + state = self.get_last_state() + if state: + return state.last_profiled.get(urn) + + return None diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py b/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py index 98da2a4ff56ef2..69daaafdb33dfb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py @@ -142,6 +142,5 @@ def should_skip_this_run(self, cur_start_time_millis: int) -> bool: f" is later than the current start_time: {get_datetime_from_ts_millis_in_utc(cur_start_time_millis)}" ) logger.warning(warn_msg) - self.source.get_report().report_warning("skip-run", warn_msg) return True return False diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index 7e8860ab627f30..a2e3ba46e5b224 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -3,6 +3,7 @@ from typing import Any, Dict, Generic, Optional, Type, TypeVar, cast import pydantic +from pydantic import root_validator from pydantic.fields import Field from pydantic.generics import GenericModel @@ -10,6 +11,7 @@ ConfigModel, ConfigurationError, DynamicTypedConfig, + LineageConfig, ) from datahub.configuration.source_common import DatasetSourceConfigBase from datahub.ingestion.api.common import PipelineContext @@ -100,6 +102,61 @@ class StatefulIngestionConfigBase( ) +class LineageStatefulIngestionConfig(StatefulIngestionConfigBase, LineageConfig): + store_last_lineage_extraction_timestamp: bool = Field( + default=False, + description="Enable checking last lineage extraction date in store.", + ) + + @root_validator(pre=False) + def lineage_stateful_option_validator(cls, values: Dict) -> Dict: + sti = values.get("stateful_ingestion") + if not sti or not sti.enabled: + if values.get("store_last_lineage_extraction_timestamp"): + logger.warning( + "Stateful ingestion is disabled, disabling store_last_lineage_extraction_timestamp config option as well" + ) + values["store_last_lineage_extraction_timestamp"] = False + + return values + + +class ProfilingStatefulIngestionConfig(StatefulIngestionConfigBase): + store_last_profiling_timestamps: bool = Field( + default=False, + description="Enable storing last profile timestamp in store.", + ) + + @root_validator(pre=False) + def profiling_stateful_option_validator(cls, values: Dict) -> Dict: + sti = values.get("stateful_ingestion") + if not sti or not sti.enabled: + if values.get("store_last_profiling_timestamps"): + logger.warning( + "Stateful ingestion is disabled, disabling store_last_profiling_timestamps config option as well" + ) + values["store_last_profiling_timestamps"] = False + return values + + +class UsageStatefulIngestionConfig(StatefulIngestionConfigBase): + store_last_usage_extraction_timestamp: bool = Field( + default=True, + description="Enable checking last usage timestamp in store.", + ) + + @root_validator(pre=False) + def last_usage_extraction_stateful_option_validator(cls, values: Dict) -> Dict: + sti = values.get("stateful_ingestion") + if not sti or not sti.enabled: + if values.get("store_last_usage_extraction_timestamp"): + logger.warning( + "Stateful ingestion is disabled, disabling store_last_usage_extraction_timestamp config option as well" + ) + values["store_last_usage_extraction_timestamp"] = False + return values + + @dataclass class StatefulIngestionReport(SourceReport): pass