Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): allow extracting snowflake tags #6500

Merged
merged 25 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions metadata-ingestion/docs/sources/snowflake/snowflake_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ grant usage on DATABASE "<your-database>" to role datahub_role;
grant usage on all schemas in database "<your-database>" to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;

// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
grant references on all tables in database "<your-database>" to role datahub_role;
grant references on future tables in database "<your-database>" to role datahub_role;
grant references on all external tables in database "<your-database>" to role datahub_role;
Expand All @@ -30,10 +30,10 @@ grant select on future tables in database "<your-database>" to role datahub_role
grant select on all external tables in database "<your-database>" to role datahub_role;
grant select on future external tables in database "<your-database>" to role datahub_role;

// Create a new DataHub user and assign the DataHub role to it
// Create a new DataHub user and assign the DataHub role to it
create user datahub_user display_name = 'DataHub' password='' default_role = datahub_role default_warehouse = '<your-warehouse>';

// Grant the datahub_role to the new DataHub user.
// Grant the datahub_role to the new DataHub user.
grant role datahub_role to user datahub_user;
```

Expand All @@ -50,7 +50,7 @@ grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;

This represents the bare minimum privileges required to extract databases, schemas, views, tables from Snowflake.

If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag or extraction of usage statistics, via the `include_usage_stats` config, you'll also need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. This can be done by granting access to the `snowflake` database.
If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, extraction of usage statistics, via the `include_usage_stats` config, or extraction of tags (without lineage), via the `extract_tags` config, you'll also need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. This can be done by granting access to the `snowflake` database.

```sql
grant imported privileges on database snowflake to role datahub_role;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ class SnowflakeObjectDomain(str, Enum):
EXTERNAL_TABLE = "external table"
VIEW = "view"
MATERIALIZED_VIEW = "materialized view"
DATABASE = "database"
SCHEMA = "schema"
COLUMN = "column"


GENERIC_PERMISSION_ERROR_KEY = "permission-error"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from enum import Enum
from typing import Dict, Optional, cast

from pydantic import Field, SecretStr, root_validator, validator
Expand All @@ -19,6 +20,12 @@
logger = logging.Logger(__name__)


class TagOption(str, Enum):
with_lineage = "with_lineage"
without_lineage = "without_lineage"
skip = "skip"
frsann marked this conversation as resolved.
Show resolved Hide resolved


class SnowflakeV2Config(
SnowflakeConfig,
SnowflakeUsageConfig,
Expand Down Expand Up @@ -53,6 +60,14 @@ class SnowflakeV2Config(
default=None, description="Not supported"
)

extract_tags: TagOption = Field(
default=TagOption.skip,
description="""Optional. Allowed values are `without_lineage`, `with_lineage`, and `skip` (default).
`without_lineage` only extracts tags that have been applied directly to the given entity.
`with_lineage` extracts both directly applied and propagated tags, but will be significantly slower.
See the [Snowflake documentation](https://docs.snowflake.com/en/user-guide/object-tagging.html#tag-lineage) for information about tag lineage/propagation. """,
)

classification: Optional[ClassificationConfig] = Field(
default=None,
description="For details, refer [Classification](../../../../metadata-ingestion/docs/dev_guides/classification.md).",
Expand All @@ -76,6 +91,11 @@ def validate_include_column_lineage(cls, v, values):
)
return v

tag_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
)

@root_validator(pre=False)
def validate_unsupported_configs(cls, values: Dict) -> Dict:
value = values.get("provision_role")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,52 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
and table_type in ('BASE TABLE', 'EXTERNAL TABLE')
order by table_schema, table_name"""

@staticmethod
def get_all_tags_on_object_with_propagation(
db_name: str, quoted_identifier: str, domain: str
) -> str:
# https://docs.snowflake.com/en/sql-reference/functions/tag_references.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE"
FROM table("{db_name}".information_schema.tag_references('{quoted_identifier}', '{domain}'));
"""

@staticmethod
def get_all_tags_in_database_without_propagation(db_name: str) -> str:
# https://docs.snowflake.com/en/sql-reference/account-usage/tag_references.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE",
object_database as "OBJECT_DATABASE",
object_schema AS "OBJECT_SCHEMA",
object_name AS "OBJECT_NAME",
column_name AS "COLUMN_NAME",
domain as "DOMAIN"
FROM snowflake.account_usage.tag_references
WHERE (object_database = '{db_name}' OR object_name = '{db_name}')
AND domain in ('DATABASE', 'SCHEMA', 'TABLE', 'COLUMN')
AND object_deleted IS NULL;
"""

@staticmethod
def get_tags_on_columns_with_propagation(
db_name: str, quoted_table_identifier: str
) -> str:
# https://docs.snowflake.com/en/sql-reference/functions/tag_references_all_columns.html
return f"""
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
tag_value AS "TAG_VALUE",
column_name AS "COLUMN_NAME"
FROM table("{db_name}".information_schema.tag_references_all_columns('{quoted_table_identifier}', 'table'));
"""

# View definition is retrived in information_schema query only if role is owner of view. Hence this query is not used.
# https://community.snowflake.com/s/article/Is-it-possible-to-see-the-view-definition-in-information-schema-views-from-a-non-owner-role
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import MutableSet, Optional

from datahub.ingestion.source.snowflake.constants import SnowflakeEdition
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
Expand All @@ -12,6 +12,7 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor

schemas_scanned: int = 0
databases_scanned: int = 0
tags_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
Expand All @@ -31,8 +32,16 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport, ProfilingSqlRepor
num_get_views_for_schema_queries: int = 0
num_get_columns_for_table_queries: int = 0

# these will be non-zero if the user choses to enable the extract_tags = "with_lineage" option, which requires
# individual queries per object (database, schema, table) and an extra query per table to get the tags on the columns.
num_get_tags_for_object_queries: int = 0
num_get_tags_on_columns_for_table_queries: int = 0

rows_zero_objects_modified: int = 0

_processed_tags: MutableSet[str] = set()
_scanned_tags: MutableSet[str] = set()

edition: Optional[SnowflakeEdition] = None

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
Expand All @@ -47,5 +56,21 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
self.schemas_scanned += 1
elif ent_type == "database":
self.databases_scanned += 1
elif ent_type == "tag":
# the same tag can be assigned to multiple objects, so we need
# some extra logic account for each tag only once.
if self._is_tag_scanned(name):
return
self._scanned_tags.add(name)
self.tags_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def is_tag_processed(self, tag_name: str) -> bool:
return tag_name in self._processed_tags

def _is_tag_scanned(self, tag_name: str) -> bool:
return tag_name in self._scanned_tags

def report_tag_processed(self, tag_name: str) -> None:
self._processed_tags.add(tag_name)
Loading