Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery/snowflake - Store last profile date in state #6832

Merged
merged 10 commits into from
Dec 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
Expand Down Expand Up @@ -112,6 +116,7 @@
auto_stale_entity_removal,
auto_status_aspect,
)
from datahub.utilities.time import datetime_to_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,7 +211,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
# For database, schema, tables, views, etc
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
self.usage_extractor = BigQueryUsageExtractor(config, self.report)
self.profiler = BigqueryProfiler(config, self.report)

# Currently caching using instance variables
# TODO - rewrite cache for readability or use out of the box solution
Expand All @@ -231,6 +235,25 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
)

self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self.profiling_state_handler: Optional[ProfilingHandler] = None
if self.config.store_last_profiling_timestamps:
self.profiling_state_handler = ProfilingHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
self.profiler = BigqueryProfiler(
config, self.report, self.profiling_state_handler
)

atexit.register(cleanup, config)

@classmethod
Expand Down Expand Up @@ -602,9 +625,48 @@ def _process_project(
continue

if self.config.include_table_lineage:
if (
self.config.store_last_lineage_extraction_timestamp
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return

if self.config.store_last_lineage_extraction_timestamp:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

yield from self.generate_lineage(project_id)

if self.config.include_usage_statistics:
if (
self.config.store_last_usage_extraction_timestamp
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
self.report.report_warning(
"usage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return

if self.config.store_last_usage_extraction_timestamp:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

yield from self.generate_usage_statistics(project_id)

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

from pydantic import Field, PositiveInt, root_validator

from datahub.configuration.common import AllowDenyPattern, LineageConfig
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.state.stateful_ingestion_base import (
LineageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
UsageStatefulIngestionConfig,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.sql.bigquery import BigQueryConfig

Expand All @@ -23,7 +28,12 @@ class BigQueryUsageConfig(BaseUsageConfig):
)


class BigQueryV2Config(BigQueryConfig, LineageConfig):
class BigQueryV2Config(
BigQueryConfig,
LineageStatefulIngestionConfig,
UsageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
):
project_id_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for project_id to filter in ingestion.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
GenericProfiler,
TableProfilerRequest,
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler

logger = logging.getLogger(__name__)

Expand All @@ -34,8 +35,13 @@ class BigqueryProfiler(GenericProfiler):
config: BigQueryV2Config
report: BigQueryV2Report

def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None:
super().__init__(config, report, "bigquery")
def __init__(
self,
config: BigQueryV2Config,
report: BigQueryV2Report,
state_handler: Optional[ProfilingHandler] = None,
) -> None:
super().__init__(config, report, "bigquery", state_handler)
self.config = config
self.report = report

Expand Down Expand Up @@ -164,17 +170,16 @@ def get_workunits(
continue

for table in tables[project][dataset]:
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()
for column in table.columns:
# Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling
# We also filter columns without data type as it means that column is part of a complex type.
if not column.data_type or any(
word in column.data_type.lower()
for word in ["array", "struct", "geography", "json"]
):
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()

self.config.profile_pattern.deny.append(
f"^{normalized_table_name}.{column.field_path}$"
)
Expand All @@ -188,40 +193,51 @@ def get_workunits(

if len(profile_requests) == 0:
continue
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if request is None or profile is None:
continue
yield from self.generate_wu_from_profile_requests(profile_requests)

def generate_wu_from_profile_requests(
self, profile_requests: List[BigqueryProfilerRequest]
) -> Iterable[MetadataWorkUnit]:
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if request is None or profile is None:
continue

request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count
request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count

dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
wu = wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler and not request.profile_table_level_only:
self.state_handler.add_to_state(
dataset_urn, int(datetime.datetime.utcnow().timestamp() * 1000)
)
self.report.report_workunit(wu)
yield wu

wu = wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
)
self.report.report_workunit(wu)
yield wu

def get_bigquery_profile_request(
self, project: str, dataset: str, table: BigqueryTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.glossary.classifier import ClassificationConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
ProfilingStatefulIngestionConfig,
UsageStatefulIngestionConfig,
)
from datahub.ingestion.source_config.sql.snowflake import (
BaseSnowflakeConfig,
SnowflakeConfig,
Expand All @@ -15,7 +19,12 @@
logger = logging.Logger(__name__)


class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
UsageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
):
convert_urns_to_lowercase: bool = Field(
default=True,
)
Expand Down Expand Up @@ -54,7 +63,7 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names = bool = Field(
match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional, cast

from snowflake.sqlalchemy import snowdialect
Expand Down Expand Up @@ -28,6 +29,7 @@
GenericProfiler,
TableProfilerRequest,
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler

snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType

Expand All @@ -44,8 +46,13 @@ class SnowflakeProfiler(SnowflakeCommonMixin, GenericProfiler, SnowflakeCommonPr
config: SnowflakeV2Config
report: SnowflakeV2Report

def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
super().__init__(config, report, self.platform)
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
state_handler: Optional[ProfilingHandler] = None,
) -> None:
super().__init__(config, report, self.platform, state_handler)
self.config = config
self.report = report
self.logger = logger
Expand Down Expand Up @@ -102,6 +109,13 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
self.config.platform_instance,
self.config.env,
)

# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler:
self.state_handler.add_to_state(
dataset_urn, int(datetime.utcnow().timestamp() * 1000)
)

yield self.wrap_aspect_as_workunit(
"dataset",
dataset_urn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
SnowflakeQueryMixin,
)
from datahub.ingestion.source.sql.sql_common import SqlContainerSubTypes
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
Expand Down Expand Up @@ -225,9 +226,20 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
# For usage stats
self.usage_extractor = SnowflakeUsageExtractor(config, self.report)

self.profiling_state_handler: Optional[ProfilingHandler] = None
if self.config.store_last_profiling_timestamps:
self.profiling_state_handler = ProfilingHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

if config.profiling.enabled:
# For profiling
self.profiler = SnowflakeProfiler(config, self.report)
self.profiler = SnowflakeProfiler(
config, self.report, self.profiling_state_handler
)

if self.is_classification_enabled():
self.classifiers = self.get_classifiers()
Expand Down Expand Up @@ -455,17 +467,25 @@ def get_workunits(self) -> Iterable[WorkUnit]:
yield from self.profiler.get_workunits(databases)

if self.config.include_usage_stats or self.config.include_operational_stats:
if self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
if (
self.config.store_last_usage_extraction_timestamp
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
# Skip this run
self.report.report_warning(
"usage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return

# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
if self.config.store_last_usage_extraction_timestamp:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

discovered_datasets: List[str] = [
self.get_dataset_identifier(table.name, schema.name, db.name)
Expand Down
Loading