Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery/snowflake - Store last profile date in state #6832

Merged
merged 10 commits into from
Dec 28, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
Expand Down Expand Up @@ -112,6 +116,7 @@
auto_stale_entity_removal,
auto_status_aspect,
)
from datahub.utilities.time import datetime_to_ts_millis

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -206,7 +211,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
# For database, schema, tables, views, etc
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
self.usage_extractor = BigQueryUsageExtractor(config, self.report)
self.profiler = BigqueryProfiler(config, self.report)

# Currently caching using instance variables
# TODO - rewrite cache for readability or use out of the box solution
Expand All @@ -231,6 +235,25 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
)

self.redundant_run_skip_handler = RedundantRunSkipHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self.profiling_state_handler: Optional[ProfilingHandler] = None
if self.config.enable_profiling_state:
self.profiling_state_handler = ProfilingHandler(
source=self,
config=self.config,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
self.profiler = BigqueryProfiler(
config, self.report, self.profiling_state_handler
)

atexit.register(cleanup, config)

@classmethod
Expand Down Expand Up @@ -602,9 +625,48 @@ def _process_project(
continue

if self.config.include_table_lineage:
if (
self.config.enable_lineage_lastrun_state
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
# Skip this run
self.report.report_warning(
"lineage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return

if self.config.enable_lineage_lastrun_state:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

yield from self.generate_lineage(project_id)

if self.config.include_usage_statistics:
if (
self.config.enable_usage_lastrun_state
and self.redundant_run_skip_handler.should_skip_this_run(
cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)
)
):
self.report.report_warning(
"usage-extraction",
f"Skip this run as there was a run later than the current start time: {self.config.start_time}",
)
return

if self.config.enable_usage_lastrun_state:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)

yield from self.generate_usage_statistics(project_id)

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
description="Use the legacy sharded table urn suffix added.",
)

enable_profiling_state: bool = Field(
default=True,
description="Enable storing last profile date in store.",
)

enable_lineage_lastrun_state: bool = Field(
default=True,
description="Enable checking last lineage date in store.",
)

enable_usage_lastrun_state: bool = Field(
default=True,
description="Enable checking last usage date in store.",
)

@root_validator(pre=False)
def stateful_option_validator(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
sti = values.get("stateful_ingestion")
if not sti or not sti.enabled:
logger.warning(
"Stateful ingestion is disabled, disabling related config options as well"
)
values["enable_profiling_state"] = False
values["enable_lineage_lastrun_state"] = False
values["enable_usage_lastrun_state"] = False

return values

@root_validator(pre=False)
def profile_default_settings(cls, values: Dict) -> Dict:
# Extra default SQLAlchemy option for better connection pooling and threading.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
GenericProfiler,
TableProfilerRequest,
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler

logger = logging.getLogger(__name__)

Expand All @@ -34,8 +35,13 @@ class BigqueryProfiler(GenericProfiler):
config: BigQueryV2Config
report: BigQueryV2Report

def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report) -> None:
super().__init__(config, report, "bigquery")
def __init__(
self,
config: BigQueryV2Config,
report: BigQueryV2Report,
state_handler: Optional[ProfilingHandler] = None,
) -> None:
super().__init__(config, report, "bigquery", state_handler)
self.config = config
self.report = report

Expand Down Expand Up @@ -164,17 +170,16 @@ def get_workunits(
continue

for table in tables[project][dataset]:
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()
for column in table.columns:
# Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling
# We also filter columns without data type as it means that column is part of a complex type.
if not column.data_type or any(
word in column.data_type.lower()
for word in ["array", "struct", "geography", "json"]
):
normalized_table_name = BigqueryTableIdentifier(
project_id=project, dataset=dataset, table=table.name
).get_table_name()

self.config.profile_pattern.deny.append(
f"^{normalized_table_name}.{column.field_path}$"
)
Expand All @@ -188,40 +193,51 @@ def get_workunits(

if len(profile_requests) == 0:
continue
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if request is None or profile is None:
continue
yield from self.generate_wu_from_profile_requests(profile_requests)

def generate_wu_from_profile_requests(
self, profile_requests: List[BigqueryProfilerRequest]
) -> Iterable[MetadataWorkUnit]:
table_profile_requests = cast(List[TableProfilerRequest], profile_requests)
for request, profile in self.generate_profiles(
table_profile_requests,
self.config.profiling.max_workers,
platform=self.platform,
profiler_args=self.get_profile_args(),
):
if request is None or profile is None:
continue

request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count
request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count

dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
wu = wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)
# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler and not request.profile_table_level_only:
self.state_handler.add_to_state(
dataset_urn, int(datetime.datetime.utcnow().timestamp() * 1000)
)
self.report.report_workunit(wu)
yield wu

wu = wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
)
self.report.report_workunit(wu)
yield wu

def get_bigquery_profile_request(
self, project: str, dataset: str, table: BigqueryTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,21 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names = bool = Field(
enable_profiling_state: bool = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively "store new state?" right

I feel like the config name isn't super clear, but I'm not sure what would be better

default=True,
description="Enable storing last profile date in store.",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

enable_usage_lastrun_state: bool = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this config feel more clear?

Suggested change
enable_usage_lastrun_state: bool = Field(
profiling_skip_table_if_not_changed: bool = Field(

default=True,
description="Enable checking last usage date in store.",
)

@validator("include_column_lineage")
def validate_include_column_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
Expand Down Expand Up @@ -140,3 +150,17 @@ def get_sql_alchemy_url(
return BaseSnowflakeConfig.get_sql_alchemy_url(
self, database=database, username=username, password=password, role=role
)

@root_validator(pre=False)
def stateful_option_validator(cls, values: Dict) -> Dict:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that these options were basically copy-pasted between the two source configs, it might be worthwhile to add a mixin class for the config

# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments don't seem relevant

sti = values.get("statefule_ingestion")
if not sti or not sti.get("enabled"):
logger.warning(
"Stateful ingestion is disabled, disabling related config options as well"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only print the warning if (1) they actually set the config (not just the default kicking in) and (2) the current value is true and we're changing it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may need to change it to a pre=True validator to do that, but not 100% sure

)
values["enable_profiling_state"] = False
values["enable_usage_lastrun_state"] = False

return values
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import dataclasses
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional, cast

from snowflake.sqlalchemy import snowdialect
Expand Down Expand Up @@ -28,6 +29,7 @@
GenericProfiler,
TableProfilerRequest,
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler

snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType

Expand All @@ -44,8 +46,13 @@ class SnowflakeProfiler(SnowflakeCommonMixin, GenericProfiler, SnowflakeCommonPr
config: SnowflakeV2Config
report: SnowflakeV2Report

def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None:
super().__init__(config, report, self.platform)
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
state_handler: Optional[ProfilingHandler] = None,
) -> None:
super().__init__(config, report, self.platform, state_handler)
self.config = config
self.report = report
self.logger = logger
Expand Down Expand Up @@ -102,6 +109,13 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
self.config.platform_instance,
self.config.env,
)

# We don't add to the profiler state if we only do table level profiling as it always happens
if self.state_handler:
self.state_handler.add_to_state(
dataset_urn, int(datetime.utcnow().timestamp() * 1000)
)

yield self.wrap_aspect_as_workunit(
"dataset",
dataset_urn,
Expand Down
Loading