Skip to content

Commit

Permalink
feat(ingest): support filtering by fully qualified schema_pattern
Browse files Browse the repository at this point in the history
add note in updating-datahub.md
  • Loading branch information
mayurinehate committed Dec 5, 2022
1 parent fdcb731 commit d317d83
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 8 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Other notable Changes

- #6611 - Snowflake `schema_pattern` now accepts pattern for fully qualified schema name in format `<catalog_name>.<schema_name>` by setting config `match_fully_qualified_names : True`. Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."

## 0.9.3

### Breaking Changes
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/configuration/pattern_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datahub.configuration.common import AllowDenyPattern


def is_schema_allowed(
schema_pattern: AllowDenyPattern,
schema_name: str,
db_name: str,
match_fully_qualified_schema_name: bool,
) -> bool:
if match_fully_qualified_schema_name:
return schema_pattern.allowed(f"{db_name}.{schema_name}")
else:
return schema_pattern.allowed(schema_name)
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ class SnowflakeV2Config(SnowflakeConfig, SnowflakeUsageConfig):
description="Whether to populate Snowsight url for Snowflake Objects",
)

match_fully_qualified_names = bool = Field(
default=False,
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
)

@root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict:

Expand All @@ -70,11 +75,26 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
"include_read_operational_stats is not supported. Set `include_read_operational_stats` to False.",
)

match_fully_qualified_names = values.get("match_fully_qualified_names")

schema_pattern: Optional[AllowDenyPattern] = values.get("schema_pattern")

if (
schema_pattern is not None
and schema_pattern != AllowDenyPattern.allow_all()
and match_fully_qualified_names is not None
and not match_fully_qualified_names
):
logger.warning(
"Please update `schema_pattern` to match against fully qualified schema name `<catalog_name>.<schema_name>` and set config `match_fully_qualified_names : True`."
"Current default `match_fully_qualified_names: False` is only to maintain backward compatibility. "
"The config option `match_fully_qualified_names` will be deprecated in future and the default behavior will assume `match_fully_qualified_names: True`."
)

# Always exclude reporting metadata for INFORMATION_SCHEMA schema
schema_pattern = values.get("schema_pattern")
if schema_pattern is not None and schema_pattern:
logger.debug("Adding deny for INFORMATION_SCHEMA to schema_pattern.")
cast(AllowDenyPattern, schema_pattern).deny.append(r"^INFORMATION_SCHEMA$")
cast(AllowDenyPattern, schema_pattern).deny.append(r".*INFORMATION_SCHEMA$")

include_technical_schema = values.get("include_technical_schema")
include_profiles = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from sqlalchemy import create_engine, inspect
from sqlalchemy.sql import sqltypes

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import WorkUnit
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -55,7 +56,12 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
continue
profile_requests = []
for schema in db.schemas:
if not self.config.schema_pattern.allowed(schema.name):
if not is_schema_allowed(
self.config.schema_pattern,
schema.name,
db.name,
self.config.match_fully_qualified_names,
):
continue
for table in schema.tables:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from snowflake.connector.cursor import DictCursor
from typing_extensions import Protocol

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.snowflake_config import SnowflakeV2Config
Expand Down Expand Up @@ -136,7 +137,12 @@ def _is_dataset_pattern_allowed(

if not self.config.database_pattern.allowed(
dataset_params[0].strip('"')
) or not self.config.schema_pattern.allowed(dataset_params[1].strip('"')):
) or not is_schema_allowed(
self.config.schema_pattern,
dataset_params[1].strip('"'),
dataset_params[0].strip('"'),
self.config.match_fully_qualified_names,
):
return False

if dataset_type.lower() in {"table"} and not self.config.table_pattern.allowed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pydantic
from snowflake.connector import SnowflakeConnection

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
Expand Down Expand Up @@ -508,7 +509,12 @@ def _process_database(

self.report.report_entity_scanned(snowflake_schema.name, "schema")

if not self.config.schema_pattern.allowed(snowflake_schema.name):
if not is_schema_allowed(
self.config.schema_pattern,
snowflake_schema.name,
db_name,
self.config.match_fully_qualified_names,
):
self.report.report_dropped(f"{db_name}.{snowflake_schema.name}.*")
continue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ def default_query_results(query):
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
}
},
{
"SCHEMA_NAME": "TEST2_SCHEMA",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
},
]
elif query == SnowflakeQuery.tables_for_database("TEST_DB"):
return [
Expand Down Expand Up @@ -339,7 +345,8 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
username="TST_USR",
password="TST_PWD",
include_views=False,
table_pattern=AllowDenyPattern(allow=["test_db.test_schema.*"]),
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
include_view_lineage=False,
Expand Down Expand Up @@ -408,7 +415,7 @@ def test_snowflake_private_link(pytestconfig, tmp_path, mock_time, mock_datahub_
username="TST_USR",
password="TST_PWD",
include_views=False,
table_pattern=AllowDenyPattern(allow=["test_db.test_schema.*"]),
schema_pattern=AllowDenyPattern(allow=["test_schema"]),
include_technical_schema=True,
include_table_lineage=False,
include_view_lineage=False,
Expand Down

0 comments on commit d317d83

Please sign in to comment.