Skip to content

Commit

Permalink
feat(ingest): add snowflake-queries source (#10835)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and yoonhyejin committed Jul 16, 2024
1 parent ee1343f commit f83d4b7
Show file tree
Hide file tree
Showing 39 changed files with 2,284 additions and 714 deletions.
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"snowflake-summary": snowflake_common | usage_common | sqlglot_lib,
"snowflake-queries": snowflake_common | usage_common | sqlglot_lib,
"sqlalchemy": sql_common,
"sql-queries": usage_common | sqlglot_lib,
"slack": slack,
Expand Down Expand Up @@ -662,6 +663,7 @@
"slack = datahub.ingestion.source.slack.slack:SlackSource",
"snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source",
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

logger = logging.getLogger(__name__)

_MAX_CONTEXT_STRING_LENGTH = 300


class SourceCapability(Enum):
PLATFORM_INSTANCE = "Platform Instance"
Expand Down Expand Up @@ -112,6 +114,9 @@ def report_log(
log_key = f"{title}-{message}"
entries = self._entries[level]

if context and len(context) > _MAX_CONTEXT_STRING_LENGTH:
context = f"{context[:_MAX_CONTEXT_STRING_LENGTH]} ..."

log_content = f"{message} => {context}" if context else message
if exc:
log_content += f"{log_content}: {exc}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def from_string_name(cls, ref: str) -> "BigQueryTableRef":
def from_urn(cls, urn: str) -> "BigQueryTableRef":
"""Raises: ValueError if urn is not a valid BigQuery table URN."""
dataset_urn = DatasetUrn.create_from_string(urn)
split = dataset_urn.get_dataset_name().rsplit(".", 3)
split = dataset_urn.name.rsplit(".", 3)
if len(split) == 3:
project, dataset, table = split
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnectionConfig,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,7 +68,7 @@ class Constant:
}


class SnowflakeDestinationConfig(BaseSnowflakeConfig):
class SnowflakeDestinationConfig(SnowflakeConnectionConfig):
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ def build(
for schema, tables in schemas.items()
for table in tables
}
self.aggregator.is_temp_table = lambda urn: urn not in self.known_urns
self.aggregator._is_temp_table = (
lambda name: DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn()
not in self.known_urns
)

# Handle all the temp tables up front.
if self.config.resolve_temp_table_in_lineage:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional
from typing import Iterable, List, Optional

from pydantic import BaseModel

Expand All @@ -11,14 +11,14 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeConnectionMixin,
SnowflakeQueryMixin,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeIdentifierMixin
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionResult,
AssertionResultType,
Expand All @@ -40,30 +40,27 @@ class DataQualityMonitoringResult(BaseModel):
VALUE: int


class SnowflakeAssertionsHandler(
SnowflakeCommonMixin, SnowflakeQueryMixin, SnowflakeConnectionMixin
):
class SnowflakeAssertionsHandler(SnowflakeIdentifierMixin):
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
connection: SnowflakeConnection,
) -> None:
self.config = config
self.report = report
self.logger = logger
self.dataset_urn_builder = dataset_urn_builder
self.connection = None
self.connection = connection
self._urns_processed: List[str] = []

@property
def identifier_config(self) -> SnowflakeIdentifierConfig:
return self.config

def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
self.connection = self.create_connection()
if self.connection is None:
return

cur = self.query(
cur = self.connection.query(
SnowflakeQuery.dmf_assertion_results(
datetime_to_ts_millis(self.config.start_time),
datetime_to_ts_millis(self.config.end_time),
Expand Down Expand Up @@ -110,7 +107,7 @@ def _process_result_row(
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.dataset_urn_builder(assertee),
asserteeUrn=self.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
from enum import Enum
from typing import Dict, List, Optional, Set, cast

import pydantic
from pydantic import Field, SecretStr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.configuration.source_common import (
EnvConfigMixin,
LowerCaseDatasetUrnConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnectionConfig,
)
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
StatefulProfilingConfigMixin,
StatefulUsageConfigMixin,
)
from datahub.ingestion.source_config.sql.snowflake import (
BaseSnowflakeConfig,
SnowflakeConfig,
)
from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig
from datahub.utilities.global_warning_util import add_global_warning

Expand All @@ -32,11 +39,12 @@
#
# DBT incremental models create temporary tables ending with __dbt_tmp
# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540
DEFAULT_TABLES_DENY_LIST = [
DEFAULT_TEMP_TABLES_PATTERNS = [
r".*\.FIVETRAN_.*_STAGING\..*", # fivetran
r".*__DBT_TMP$", # dbt
rf".*\.SEGMENT_{UUID_REGEX}", # segment
rf".*\.STAGING_.*_{UUID_REGEX}", # stitch
r".*\.(GE_TMP_|GE_TEMP_|GX_TEMP_)[0-9A-F]{8}", # great expectations
]


Expand Down Expand Up @@ -73,6 +81,93 @@ def source_database(self) -> DatabaseId:
return DatabaseId(self.database, self.platform_instance)


class SnowflakeFilterConfig(SQLFilterConfig):
database_pattern: AllowDenyPattern = Field(
AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"],
),
description="Regex patterns for databases to filter in ingestion.",
)

schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter in ingestion. Will match against the full `database.schema` name if `match_fully_qualified_names` is enabled.",
)
# table_pattern and view_pattern are inherited from SQLFilterConfig

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

@root_validator(pre=False, skip_on_failure=True)
def validate_legacy_schema_pattern(cls, values: Dict) -> Dict:
schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")
match_fully_qualified_names = values.get("match_fully_qualified_names")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

return values


class SnowflakeIdentifierConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, LowerCaseDatasetUrnConfigMixin
):
# Changing default value here.
convert_urns_to_lowercase: bool = Field(
default=True,
)


# TODO: SnowflakeConfig is unused except for this inheritance. We should collapse the config inheritance hierarchy.
class SnowflakeConfig(
SnowflakeIdentifierConfig,
SnowflakeFilterConfig,
# SnowflakeFilterConfig must come before (higher precedence) the SQLCommon config, so that the documentation overrides are applied.
SnowflakeConnectionConfig,
BaseTimeWindowConfig,
SQLCommonConfig,
):
include_table_lineage: bool = pydantic.Field(
default=True,
description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.",
)
include_view_lineage: bool = pydantic.Field(
default=True,
description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.",
)

ignore_start_time_lineage: bool = False
upstream_lineage_in_report: bool = False

@pydantic.root_validator(skip_on_failure=True)
def validate_include_view_lineage(cls, values):
if (
"include_table_lineage" in values
and not values.get("include_table_lineage")
and values.get("include_view_lineage")
):
raise ValueError(
"include_table_lineage must be True for include_view_lineage to be set."
)
return values


class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
Expand All @@ -81,10 +176,6 @@ class SnowflakeV2Config(
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
):
convert_urns_to_lowercase: bool = Field(
default=True,
)

include_usage_stats: bool = Field(
default=True,
description="If enabled, populates the snowflake usage statistics. Requires appropriate grants given to the role.",
Expand Down Expand Up @@ -133,11 +224,6 @@ class SnowflakeV2Config(
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

_use_legacy_lineage_method_removed = pydantic_removed_field(
"use_legacy_lineage_method"
)
Expand All @@ -154,7 +240,7 @@ class SnowflakeV2Config(

# This is required since access_history table does not capture whether the table was temporary table.
temporary_tables_pattern: List[str] = Field(
default=DEFAULT_TABLES_DENY_LIST,
default=DEFAULT_TEMP_TABLES_PATTERNS,
description="[Advanced] Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to "
"match the entire table name in database.schema.table format. Defaults are to set in such a way "
"to ignore the temporary staging tables created by known ETL tools.",
Expand Down Expand Up @@ -210,27 +296,6 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
"include_read_operational_stats is not supported. Set `include_read_operational_stats` to False.",
)

match_fully_qualified_names = values.get("match_fully_qualified_names")

schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

include_technical_schema = values.get("include_technical_schema")
include_profiles = (
values.get("profiling") is not None and values["profiling"].enabled
Expand Down Expand Up @@ -259,7 +324,7 @@ def get_sql_alchemy_url(
password: Optional[SecretStr] = None,
role: Optional[str] = None,
) -> str:
return BaseSnowflakeConfig.get_sql_alchemy_url(
return SnowflakeConnectionConfig.get_sql_alchemy_url(
self, database=database, username=username, password=password, role=role
)

Expand Down
Loading

0 comments on commit f83d4b7

Please sign in to comment.