Skip to content

Commit

Permalink
feat(ingest): allow extracting snowflake tags (datahub-project#6500)
Browse files Browse the repository at this point in the history
  • Loading branch information
frsann authored and cccs-Dustin committed Feb 1, 2023
1 parent 00b4c23 commit 3f2451b
Show file tree
Hide file tree
Showing 12 changed files with 4,933 additions and 1,391 deletions.
8 changes: 4 additions & 4 deletions metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ grant usage on DATABASE "<your-database>" to role datahub_role;
grant usage on all schemas in database "<your-database>" to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;

// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
grant references on all tables in database "<your-database>" to role datahub_role;
grant references on future tables in database "<your-database>" to role datahub_role;
grant references on all external tables in database "<your-database>" to role datahub_role;
Expand All @@ -30,10 +30,10 @@ grant select on future tables in database "<your-database>" to role datahub_role
grant select on all external tables in database "<your-database>" to role datahub_role;
grant select on future external tables in database "<your-database>" to role datahub_role;

// Create a new DataHub user and assign the DataHub role to it
// Create a new DataHub user and assign the DataHub role to it
create user datahub_user display_name = 'DataHub' password='' default_role = datahub_role default_warehouse = '<your-warehouse>';

// Grant the datahub_role to the new DataHub user.
// Grant the datahub_role to the new DataHub user.
grant role datahub_role to user datahub_user;
```

Expand All @@ -50,7 +50,7 @@ grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;

This represents the bare minimum privileges required to extract databases, schemas, views, tables from Snowflake.

If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag or extraction of usage statistics, via the `include_usage_stats` config, you'll also need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. This can be done by granting access to the `snowflake` database.
If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, extraction of usage statistics, via the `include_usage_stats` config, or extraction of tags (without lineage), via the `extract_tags` config, you'll also need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. This can be done by granting access to the `snowflake` database.

```sql
grant imported privileges on database snowflake to role datahub_role;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class SnowflakeObjectDomain(str, Enum):
EXTERNAL_TABLE = "external table"
VIEW = "view"
MATERIALIZED_VIEW = "materialized view"
DATABASE = "database"
SCHEMA = "schema"
COLUMN = "column"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from enum import Enum
from typing import Dict, Optional, cast

from pydantic import Field, SecretStr, root_validator, validator
Expand All @@ -19,6 +20,12 @@
logger = logging.Logger(__name__)


class TagOption(str, Enum):
with_lineage = "with_lineage"
without_lineage = "without_lineage"
skip = "skip"


class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
Expand Down Expand Up @@ -53,6 +60,14 @@ class SnowflakeV2Config(
default=None, description="Not supported"
)

extract_tags: TagOption = Field(
default=TagOption.skip,
description="""Optional. Allowed values are `without_lineage`, `with_lineage`, and `skip` (default).
`without_lineage` only extracts tags that have been applied directly to the given entity.
`with_lineage` extracts both directly applied and propagated tags, but will be significantly slower.
See the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/object-tagging.html#tag-lineage) for information about tag lineage/propagation. """,
)

classification: Optional[ClassificationConfig] = Field(
default=None,
description="For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
Expand All @@ -76,6 +91,11 @@ def validate_include_column_lineage(cls, v, values):
)
return v

tag_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

@root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict:
value = values.get("provision_role")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,52 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
and table_type in ('BASE TABLE', 'EXTERNAL TABLE')
order by table_schema, table_name"""

@staticmethod
def get_all_tags_on_object_with_propagation(
db_name: str, quoted_identifier: str, domain: str
) -> str:
# https://docs.snowflake.com/en/sql-reference/functions/tag_references.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE"
FROM table("{db_name}".information_schema.tag_references('{quoted_identifier}', '{domain}'));
"""

@staticmethod
def get_all_tags_in_database_without_propagation(db_name: str) -> str:
# https://docs.snowflake.com/en/sql-reference/account-usage/tag_references.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE",
object_database as "OBJECT_DATABASE",
object_schema AS "OBJECT_SCHEMA",
object_name AS "OBJECT_NAME",
column_name AS "COLUMN_NAME",
domain as "DOMAIN"
FROM snowflake.account_usage.tag_references
WHERE (object_database = '{db_name}' OR object_name = '{db_name}')
AND domain in ('DATABASE', 'SCHEMA', 'TABLE', 'COLUMN')
AND object_deleted IS NULL;
"""

@staticmethod
def get_tags_on_columns_with_propagation(
db_name: str, quoted_table_identifier: str
) -> str:
# https://docs.snowflake.com/en/sql-reference/functions/tag_references_all_columns.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE",
column_name AS "COLUMN_NAME"
FROM table("{db_name}".information_schema.tag_references_all_columns('{quoted_table_identifier}', 'table'));
"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import MutableSet, Optional

from datahub.ingestion.source.snowflake.constants import SnowflakeEdition
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
Expand All @@ -12,6 +12,7 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor

schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
Expand All @@ -31,8 +32,16 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor
num_get_views_for_schema_queries: int = 0
num_get_columns_for_table_queries: int = 0

# these will be non-zero if the user choses to enable the extract_tags = "with_lineage" option, which requires
# individual queries per object (database, schema, table) and an extra query per table to get the tags on the columns.
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = set()
_scanned_tags: MutableSet[str] = set()

edition: Optional[SnowflakeEdition] = None

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
Expand All @@ -47,5 +56,21 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
self.schemas_scanned += 1
elif ent_type == "database":
self.databases_scanned += 1
elif ent_type == "tag":
# the same tag can be assigned to multiple objects, so we need
# some extra logic account for each tag only once.
if self._is_tag_scanned(name):
return
self._scanned_tags.add(name)
self.tags_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def is_tag_processed(self, tag_name: str) -> bool:
return tag_name in self._processed_tags

def _is_tag_scanned(self, tag_name: str) -> bool:
return tag_name in self._scanned_tags

def report_tag_processed(self, tag_name: str) -> None:
self._processed_tags.add(tag_name)
Loading

0 comments on commit 3f2451b

Please sign in to comment.