Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add snowflake-queries source #10835

Merged
merged 35 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e558177
add preparsed_query abstraction
hsheth2 Jul 2, 2024
ff937f7
add query stats + fixes
hsheth2 Jul 2, 2024
8bed4dc
emit columns in query subjects
hsheth2 Jul 3, 2024
5ad2963
add setup.py
hsheth2 Jul 3, 2024
bc625dc
fix some lint issues
hsheth2 Jul 3, 2024
a8b1c7b
add a SnowflakeConnection wrapper class
hsheth2 Jul 4, 2024
890d63f
remove SnowflakeLoggingProtocol
hsheth2 Jul 4, 2024
ae86094
make snowflake_connection have config info + stop using source_config…
hsheth2 Jul 4, 2024
8827d10
tweak type annotations
hsheth2 Jul 4, 2024
3d21110
add configurability
hsheth2 Jul 8, 2024
b210399
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 8, 2024
2205961
improve domain filtering
hsheth2 Jul 8, 2024
4edfce2
add parser exception handling
hsheth2 Jul 8, 2024
8b1eaf1
add additional lines
hsheth2 Jul 9, 2024
605279a
refactor snowsight url generation, create SnowsightUrlBuilder
hsheth2 Jul 9, 2024
de8f108
add schema fields to query subjects
hsheth2 Jul 9, 2024
0de982c
fix unexpected queries in tests
hsheth2 Jul 10, 2024
287d2b6
refactor snowflake configs
hsheth2 Jul 9, 2024
83ba873
tweak ordering of classes
hsheth2 Jul 10, 2024
1794feb
more config refactoring
hsheth2 Jul 10, 2024
c52cf05
more refactoring
hsheth2 Jul 10, 2024
a15f966
add gen urn to identifier mixin
hsheth2 Jul 10, 2024
ca5c0bc
add SnowflakeQueriesExtractor interface
hsheth2 Jul 10, 2024
f0b8e79
improve warnings
hsheth2 Jul 10, 2024
a76e015
add filters support
hsheth2 Jul 10, 2024
b789e3f
add schema fields to aggregator goldens
hsheth2 Jul 10, 2024
69e782a
fix test mock
hsheth2 Jul 11, 2024
0533c83
fix table filtering logic
hsheth2 Jul 11, 2024
c8c9ac5
fix filtering logic
hsheth2 Jul 11, 2024
4c4fb22
fix dup subjects for merge statements
hsheth2 Jul 11, 2024
2e31235
fix lint
hsheth2 Jul 11, 2024
a23f86c
fix(build): upgrade vercel builds to Node 20.x
hsheth2 Jul 11, 2024
9d43343
fix import
hsheth2 Jul 11, 2024
8a1f98d
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 11, 2024
95d1ea2
Merge branch 'master' into snowflake-queries-oss
hsheth2 Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"snowflake-summary": snowflake_common | usage_common | sqlglot_lib,
"snowflake-queries": snowflake_common | usage_common | sqlglot_lib,
"sqlalchemy": sql_common,
"sql-queries": usage_common | sqlglot_lib,
"slack": slack,
Expand Down Expand Up @@ -662,6 +663,7 @@
"slack = datahub.ingestion.source.slack.slack:SlackSource",
"snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source",
"snowflake-summary = datahub.ingestion.source.snowflake.snowflake_summary:SnowflakeSummarySource",
"snowflake-queries = datahub.ingestion.source.snowflake.snowflake_queries:SnowflakeQueriesSource",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

logger = logging.getLogger(__name__)

_MAX_CONTEXT_STRING_LENGTH = 300


class SourceCapability(Enum):
PLATFORM_INSTANCE = "Platform Instance"
Expand Down Expand Up @@ -112,6 +114,9 @@ def report_log(
log_key = f"{title}-{message}"
entries = self._entries[level]

if context and len(context) > _MAX_CONTEXT_STRING_LENGTH:
context = f"{context[:_MAX_CONTEXT_STRING_LENGTH]} ..."

log_content = f"{message} => {context}" if context else message
if exc:
log_content += f"{log_content}: {exc}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def from_string_name(cls, ref: str) -> "BigQueryTableRef":
def from_urn(cls, urn: str) -> "BigQueryTableRef":
"""Raises: ValueError if urn is not a valid BigQuery table URN."""
dataset_urn = DatasetUrn.create_from_string(urn)
split = dataset_urn.get_dataset_name().rsplit(".", 3)
split = dataset_urn.name.rsplit(".", 3)
if len(split) == 3:
project, dataset, table = split
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
BigQueryConnectionConfig,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnectionConfig,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -66,7 +68,7 @@ class Constant:
}


class SnowflakeDestinationConfig(BaseSnowflakeConfig):
class SnowflakeDestinationConfig(SnowflakeConnectionConfig):
database: str = Field(description="The fivetran connector log database.")
log_schema: str = Field(description="The fivetran connector log schema.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,15 @@ def build(
for schema, tables in schemas.items()
for table in tables
}
self.aggregator.is_temp_table = lambda urn: urn not in self.known_urns
self.aggregator._is_temp_table = (
lambda name: DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn()
not in self.known_urns
)
Comment on lines +108 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve readability by using a named function for the lambda.

Using a named function improves readability and maintainability.

- self.aggregator._is_temp_table = (
-   lambda name: DatasetUrn.create_from_ids(
-       self.platform,
-       name,
-       env=self.config.env,
-       platform_instance=self.config.platform_instance,
-   ).urn()
-   not in self.known_urns
- )
+ def is_temp_table(name: str) -> bool:
+   return DatasetUrn.create_from_ids(
+       self.platform,
+       name,
+       env=self.config.env,
+       platform_instance=self.config.platform_instance,
+   ).urn() not in self.known_urns
+ self.aggregator._is_temp_table = is_temp_table
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.aggregator._is_temp_table = (
lambda name: DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn()
not in self.known_urns
)
def is_temp_table(name: str) -> bool:
return DatasetUrn.create_from_ids(
self.platform,
name,
env=self.config.env,
platform_instance=self.config.platform_instance,
).urn() not in self.known_urns
self.aggregator._is_temp_table = is_temp_table


# Handle all the temp tables up front.
if self.config.resolve_temp_table_in_lineage:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from datetime import datetime
from typing import Callable, Iterable, List, Optional
from typing import Iterable, List, Optional

from pydantic import BaseModel

Expand All @@ -11,14 +11,14 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeIdentifierConfig,
SnowflakeV2Config,
)
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeConnectionMixin,
SnowflakeQueryMixin,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeIdentifierMixin
from datahub.metadata.com.linkedin.pegasus2avro.assertion import (
AssertionResult,
AssertionResultType,
Expand All @@ -40,30 +40,27 @@ class DataQualityMonitoringResult(BaseModel):
VALUE: int


class SnowflakeAssertionsHandler(
SnowflakeCommonMixin, SnowflakeQueryMixin, SnowflakeConnectionMixin
):
class SnowflakeAssertionsHandler(SnowflakeIdentifierMixin):
def __init__(
self,
config: SnowflakeV2Config,
report: SnowflakeV2Report,
dataset_urn_builder: Callable[[str], str],
connection: SnowflakeConnection,
) -> None:
self.config = config
self.report = report
self.logger = logger
self.dataset_urn_builder = dataset_urn_builder
self.connection = None
self.connection = connection
self._urns_processed: List[str] = []

@property
def identifier_config(self) -> SnowflakeIdentifierConfig:
return self.config

def get_assertion_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
self.connection = self.create_connection()
if self.connection is None:
return

cur = self.query(
cur = self.connection.query(
SnowflakeQuery.dmf_assertion_results(
datetime_to_ts_millis(self.config.start_time),
datetime_to_ts_millis(self.config.end_time),
Expand Down Expand Up @@ -110,7 +107,7 @@ def _process_result_row(
aspect=AssertionRunEvent(
timestampMillis=datetime_to_ts_millis(result.MEASUREMENT_TIME),
runId=result.MEASUREMENT_TIME.strftime("%Y-%m-%dT%H:%M:%SZ"),
asserteeUrn=self.dataset_urn_builder(assertee),
asserteeUrn=self.gen_dataset_urn(assertee),
status=AssertionRunStatus.COMPLETE,
assertionUrn=make_assertion_urn(assertion_guid),
result=AssertionResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,31 @@
from enum import Enum
from typing import Dict, List, Optional, Set, cast

import pydantic
from pydantic import Field, SecretStr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.pattern_utils import UUID_REGEX
from datahub.configuration.source_common import (
EnvConfigMixin,
LowerCaseDatasetUrnConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_field_rename import pydantic_renamed_field
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.snowflake.snowflake_connection import (
SnowflakeConnectionConfig,
)
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulLineageConfigMixin,
StatefulProfilingConfigMixin,
StatefulUsageConfigMixin,
)
from datahub.ingestion.source_config.sql.snowflake import (
BaseSnowflakeConfig,
SnowflakeConfig,
)
from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig
from datahub.utilities.global_warning_util import add_global_warning

Expand All @@ -32,11 +39,12 @@
#
# DBT incremental models create temporary tables ending with __dbt_tmp
# Ref - https://discourse.getdbt.com/t/handling-bigquery-incremental-dbt-tmp-tables/7540
DEFAULT_TABLES_DENY_LIST = [
DEFAULT_TEMP_TABLES_PATTERNS = [
r".*\.FIVETRAN_.*_STAGING\..*", # fivetran
r".*__DBT_TMP$", # dbt
rf".*\.SEGMENT_{UUID_REGEX}", # segment
rf".*\.STAGING_.*_{UUID_REGEX}", # stitch
r".*\.(GE_TMP_|GE_TEMP_|GX_TEMP_)[0-9A-F]{8}", # great expectations
]


Expand Down Expand Up @@ -73,6 +81,93 @@ def source_database(self) -> DatabaseId:
return DatabaseId(self.database, self.platform_instance)


class SnowflakeFilterConfig(SQLFilterConfig):
database_pattern: AllowDenyPattern = Field(
AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"],
),
description="Regex patterns for databases to filter in ingestion.",
)

schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for schemas to filter in ingestion. Will match against the full `database.schema` name if `match_fully_qualified_names` is enabled.",
)
# table_pattern and view_pattern are inherited from SQLFilterConfig

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

@root_validator(pre=False, skip_on_failure=True)
def validate_legacy_schema_pattern(cls, values: Dict) -> Dict:
schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")
match_fully_qualified_names = values.get("match_fully_qualified_names")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

return values


class SnowflakeIdentifierConfig(
PlatformInstanceConfigMixin, EnvConfigMixin, LowerCaseDatasetUrnConfigMixin
):
# Changing default value here.
convert_urns_to_lowercase: bool = Field(
default=True,
)


# TODO: SnowflakeConfig is unused except for this inheritance. We should collapse the config inheritance hierarchy.
class SnowflakeConfig(
SnowflakeIdentifierConfig,
SnowflakeFilterConfig,
# SnowflakeFilterConfig must come before (higher precedence) the SQLCommon config, so that the documentation overrides are applied.
SnowflakeConnectionConfig,
BaseTimeWindowConfig,
SQLCommonConfig,
):
include_table_lineage: bool = pydantic.Field(
default=True,
description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.",
)
include_view_lineage: bool = pydantic.Field(
default=True,
description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.",
)

ignore_start_time_lineage: bool = False
upstream_lineage_in_report: bool = False

@pydantic.root_validator(skip_on_failure=True)
def validate_include_view_lineage(cls, values):
if (
"include_table_lineage" in values
and not values.get("include_table_lineage")
and values.get("include_view_lineage")
):
raise ValueError(
"include_table_lineage must be True for include_view_lineage to be set."
)
return values


class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
Expand All @@ -81,10 +176,6 @@ class SnowflakeV2Config(
StatefulProfilingConfigMixin,
ClassificationSourceConfigMixin,
):
convert_urns_to_lowercase: bool = Field(
default=True,
)

include_usage_stats: bool = Field(
default=True,
description="If enabled, populates the snowflake usage statistics. Requires appropriate grants given to the role.",
Expand Down Expand Up @@ -133,11 +224,6 @@ class SnowflakeV2Config(
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names: bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

_use_legacy_lineage_method_removed = pydantic_removed_field(
"use_legacy_lineage_method"
)
Expand All @@ -154,7 +240,7 @@ class SnowflakeV2Config(

# This is required since access_history table does not capture whether the table was temporary table.
temporary_tables_pattern: List[str] = Field(
default=DEFAULT_TABLES_DENY_LIST,
default=DEFAULT_TEMP_TABLES_PATTERNS,
description="[Advanced] Regex patterns for temporary tables to filter in lineage ingestion. Specify regex to "
"match the entire table name in database.schema.table format. Defaults are to set in such a way "
"to ignore the temporary staging tables created by known ETL tools.",
Expand Down Expand Up @@ -210,27 +296,6 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
"include_read_operational_stats is not supported. Set `include_read_operational_stats` to False.",
)

match_fully_qualified_names = values.get("match_fully_qualified_names")

schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

include_technical_schema = values.get("include_technical_schema")
include_profiles = (
values.get("profiling") is not None and values["profiling"].enabled
Expand Down Expand Up @@ -259,7 +324,7 @@ def get_sql_alchemy_url(
password: Optional[SecretStr] = None,
role: Optional[str] = None,
) -> str:
return BaseSnowflakeConfig.get_sql_alchemy_url(
return SnowflakeConnectionConfig.get_sql_alchemy_url(
self, database=database, username=username, password=password, role=role
)

Expand Down
Loading
Loading