Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/snowflake): support filtering by fully qualified schema_pattern #6611

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Other notable Changes

- #6611 - Snowflake `schema_pattern` now accepts pattern for fully qualified schema name in format `<catalog_name>.<schema_name>` by setting config `match_fully_qualified_names : True`. Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."

## 0.9.3

### Breaking Changes
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/configuration/pattern_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datahub.configuration.common import AllowDenyPattern


def is_schema_allowed(
schema_pattern: AllowDenyPattern,
schema_name: str,
db_name: str,
match_fully_qualified_schema_name: bool,
) -> bool:
if match_fully_qualified_schema_name:
return schema_pattern.allowed(f"{db_name}.{schema_name}")
else:
return schema_pattern.allowed(schema_name)
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names = bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

@root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict:

Expand All @@ -70,11 +75,26 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
"include_read_operational_stats is not supported. Set `include_read_operational_stats` to False.",
)

match_fully_qualified_names = values.get("match_fully_qualified_names")

schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
schema_pattern = values.get("schema_pattern")
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r"^INFORMATION_SCHEMA$")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

include_technical_schema = values.get("include_technical_schema")
include_profiles = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.sql import sqltypes

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import WorkUnit
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -55,7 +56,12 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
continue
profile_requests = []
for schema in db.schemas:
if not self.config.schema_pattern.allowed(schema.name):
if not is_schema_allowed(
self.config.schema_pattern,
schema.name,
db.name,
self.config.match_fully_qualified_names,
):
continue
for table in schema.tables:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from snowflake.connector.cursor import DictCursor
from typing_extensions import Protocol

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
Expand Down Expand Up @@ -136,7 +137,12 @@ def _is_dataset_pattern_allowed(

if not self.config.database_pattern.allowed(
dataset_params[0].strip('"')
) or not self.config.schema_pattern.allowed(dataset_params[1].strip('"')):
) or not is_schema_allowed(
self.config.schema_pattern,
dataset_params[1].strip('"'),
dataset_params[0].strip('"'),
self.config.match_fully_qualified_names,
):
return False

if dataset_type.lower() in {"table"} and not self.config.table_pattern.allowed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pydantic
from snowflake.connector import SnowflakeConnection

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
Expand Down Expand Up @@ -508,7 +509,12 @@ def _process_database(

self.report.report_entity_scanned(snowflake_schema.name, "schema")

if not self.config.schema_pattern.allowed(snowflake_schema.name):
if not is_schema_allowed(
self.config.schema_pattern,
snowflake_schema.name,
db_name,
self.config.match_fully_qualified_names,
):
self.report.report_dropped(f"{db_name}.{snowflake_schema.name}.*")
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ def default_query_results(query):
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
}
},
{
"SCHEMA_NAME": "TEST2_SCHEMA",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
},
]
elif query == SnowflakeQuery.tables_for_database("TEST_DB"):
return [
Expand Down Expand Up @@ -339,7 +345,8 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
username="TST_USR",
password="TST_PWD",
include_views=False,
table_pattern=AllowDenyPattern(allow=["test_db.test_schema.*"]),
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=False,
Expand Down Expand Up @@ -408,7 +415,7 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_
username="TST_USR",
password="TST_PWD",
include_views=False,
table_pattern=AllowDenyPattern(allow=["test_db.test_schema.*"]),
schema_pattern=AllowDenyPattern(allow=["test_schema"]),
include_technical_schema=True,
include_table_lineage=False,
include_view_lineage=False,
Expand Down