diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 5f732e2621656..7e3220daceca3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -249,6 +249,12 @@ class SnowflakeV2Config( description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.", ) + structured_properties_template_cache_invalidation_interval: int = Field( + hidden_from_docs=True, + default=60, + description="Interval in seconds to invalidate the structured properties template cache.", + ) + include_external_url: bool = Field( default=True, description="Whether to populate Snowsight url for Snowflake Objects", diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 40bcfb514efd2..7092041f8da95 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str: and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE') order by table_schema, table_name""" + @staticmethod + def get_all_tags(): + return """ + SELECT tag_database as "TAG_DATABASE", + tag_schema AS "TAG_SCHEMA", + tag_name AS "TAG_NAME", + FROM snowflake.account_usage.tag_references + GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name + ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC; + """ + @staticmethod def get_all_tags_on_object_with_propagation( db_name: str, quoted_identifier: str, domain: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index b24471f8666af..691448e1d0976 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -114,6 +114,7 @@ class SnowflakeV2Report( num_tables_with_known_upstreams: int = 0 num_upstream_lineage_edge_parsing_failed: int = 0 num_secure_views_missing_definition: int = 0 + num_structured_property_templates_created: int = 0 data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py index 173024aec0cf3..c31b7dfe5b9b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py @@ -285,6 +285,23 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]: return secure_view_definitions + def get_all_tags(self) -> List[SnowflakeTag]: + cur = self.connection.query( + SnowflakeQuery.get_all_tags(), + ) + + tags = [ + SnowflakeTag( + database=tag["TAG_DATABASE"], + schema=tag["TAG_SCHEMA"], + name=tag["TAG_NAME"], + value="", + ) + for tag in cur + ] + + return tags + @serialized_lru_cache(maxsize=1) def get_tables_for_database( self, db_name: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 04bc51f1ebd3f..134f0b343ccfe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -1,10 +1,10 @@ import itertools import logging +import time from typing import Dict, Iterable, List, Optional, Union from datahub.configuration.pattern_utils import is_schema_allowed from datahub.emitter.mce_builder import ( - get_sys_time, make_data_platform_urn, make_dataset_urn_with_platform_instance, make_schema_field_urn, @@ -74,7 +74,6 @@ PROFILING, ) from datahub.metadata.com.linkedin.pegasus2avro.common import ( - AuditStamp, GlobalTags, Status, SubTypes, @@ -101,15 +100,8 @@ StringType, TimeType, ) -from datahub.metadata.com.linkedin.pegasus2avro.structured import ( - StructuredPropertyDefinition, -) from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties from datahub.metadata.urns import ( - ContainerUrn, - DatasetUrn, - DataTypeUrn, - EntityTypeUrn, SchemaFieldUrn, StructuredPropertyUrn, ) @@ -191,7 +183,7 @@ def __init__( self.domain_registry: Optional[DomainRegistry] = domain_registry self.classification_handler = ClassificationHandler(self.config, self.report) self.tag_extractor = SnowflakeTagExtractor( - config, self.data_dictionary, self.report + config, self.data_dictionary, self.report, identifiers ) self.profiler: Optional[SnowflakeProfiler] = profiler self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = ( @@ -217,6 +209,16 @@ def snowflake_identifier(self, identifier: str) -> str: return self.identifiers.snowflake_identifier(identifier) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + if self.config.extract_tags_as_structured_properties: + logger.info("Creating structured property templates for tags") + yield from self.tag_extractor.create_structured_property_templates() + # We have to wait until cache invalidates to make sure the structured property template is available + logger.info( + f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate" + ) + time.sleep( + self.config.structured_properties_template_cache_invalidation_interval + ) self.databases = [] for database in self.get_databases() or []: self.report.report_entity_scanned(database.name, "database") @@ -698,6 +700,7 @@ def _process_view( def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: use_sp = self.config.extract_tags_as_structured_properties + identifier = ( self.snowflake_identifier(tag.structured_property_identifier()) if use_sp @@ -708,10 +711,11 @@ def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: return self.report.report_tag_processed(identifier) + if use_sp: - yield from self.gen_tag_as_structured_property_workunits(tag) - else: - yield from self.gen_tag_workunits(tag) + return + + yield from self.gen_tag_workunits(tag) def _format_tags_as_structured_properties( self, tags: List[SnowflakeTag] @@ -732,6 +736,7 @@ def gen_dataset_workunits( if table.tags: for tag in table.tags: yield from self._process_tag(tag) + for column_name in table.column_tags: for tag in table.column_tags[column_name]: yield from self._process_tag(tag) @@ -903,29 +908,6 @@ def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]: entityUrn=tag_urn, aspect=tag_properties_aspect ).as_workunit() - def gen_tag_as_structured_property_workunits( - self, tag: SnowflakeTag - ) -> Iterable[MetadataWorkUnit]: - identifier = self.snowflake_identifier(tag.structured_property_identifier()) - urn = StructuredPropertyUrn(identifier).urn() - aspect = StructuredPropertyDefinition( - qualifiedName=identifier, - displayName=tag.name, - valueType=DataTypeUrn("datahub.string").urn(), - entityTypes=[ - EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(), - EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(), - EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(), - ], - lastModified=AuditStamp( - time=get_sys_time(), actor="urn:li:corpuser:datahub" - ), - ) - yield MetadataChangeProposalWrapper( - entityUrn=urn, - aspect=aspect, - ).as_workunit() - def gen_column_tags_as_structured_properties( self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView] ) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py index 597e7bee4d4cc..08d319c7fe25d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py @@ -1,6 +1,9 @@ import logging -from typing import Dict, List, Optional +from typing import Dict, Iterable, List, Optional +from datahub.emitter.mce_builder import get_sys_time +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain from datahub.ingestion.source.snowflake.snowflake_config import ( SnowflakeV2Config, @@ -12,7 +15,22 @@ SnowflakeTag, _SnowflakeTagCache, ) -from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin +from datahub.ingestion.source.snowflake.snowflake_utils import ( + SnowflakeCommonMixin, + SnowflakeIdentifierBuilder, +) +from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp +from datahub.metadata.com.linkedin.pegasus2avro.structured import ( + StructuredPropertyDefinition, +) +from datahub.metadata.urns import ( + ContainerUrn, + DatasetUrn, + DataTypeUrn, + EntityTypeUrn, + SchemaFieldUrn, + StructuredPropertyUrn, +) logger: logging.Logger = logging.getLogger(__name__) @@ -23,11 +41,12 @@ def __init__( config: SnowflakeV2Config, data_dictionary: SnowflakeDataDictionary, report: SnowflakeV2Report, + snowflake_identifiers: SnowflakeIdentifierBuilder, ) -> None: self.config = config self.data_dictionary = data_dictionary self.report = report - + self.snowflake_identifiers = snowflake_identifiers self.tag_cache: Dict[str, _SnowflakeTagCache] = {} def _get_tags_on_object_without_propagation( @@ -59,6 +78,41 @@ def _get_tags_on_object_without_propagation( raise ValueError(f"Unknown domain {domain}") return tags + def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]: + for tag in self.data_dictionary.get_all_tags(): + if not self.config.structured_property_pattern.allowed( + tag.tag_identifier() + ): + continue + if self.config.extract_tags_as_structured_properties: + self.report.num_structured_property_templates_created += 1 + yield from self.gen_tag_as_structured_property_workunits(tag) + + def gen_tag_as_structured_property_workunits( + self, tag: SnowflakeTag + ) -> Iterable[MetadataWorkUnit]: + identifier = self.snowflake_identifiers.snowflake_identifier( + tag.structured_property_identifier() + ) + urn = StructuredPropertyUrn(identifier).urn() + aspect = StructuredPropertyDefinition( + qualifiedName=identifier, + displayName=tag.name, + valueType=DataTypeUrn("datahub.string").urn(), + entityTypes=[ + EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(), + EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(), + EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(), + ], + lastModified=AuditStamp( + time=get_sys_time(), actor="urn:li:corpuser:datahub" + ), + ) + yield MetadataChangeProposalWrapper( + entityUrn=urn, + aspect=aspect, + ).as_workunit() + def _get_tags_on_object_with_propagation( self, domain: str, diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 7b4f5abe1cd46..7a296a597468e 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -629,7 +629,27 @@ def default_query_results( # noqa: C901 ), ]: return [] - + elif query == snowflake_query.SnowflakeQuery.get_all_tags(): + return [ + *[ + { + "TAG_DATABASE": "TEST_DB", + "TAG_SCHEMA": "TEST_SCHEMA", + "TAG_NAME": f"my_tag_{ix}", + } + for ix in range(3) + ], + { + "TAG_DATABASE": "TEST_DB", + "TAG_SCHEMA": "TEST_SCHEMA", + "TAG_NAME": "security", + }, + { + "TAG_DATABASE": "OTHER_DB", + "TAG_SCHEMA": "OTHER_SCHEMA", + "TAG_NAME": "my_other_tag", + }, + ] elif ( query == snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index d2e20e784282e..340b771b76e31 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties( include_column_lineage=False, include_usage_stats=False, include_operational_stats=False, + structured_properties_template_cache_invalidation_interval=0, ), ), sink=DynamicTypedConfig( diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py index d4f6e92c93c1e..86ffdf33f585c 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py @@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny(): match_fully_qualified_names=True, schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), extract_tags_as_structured_properties=True, + structured_properties_template_cache_invalidation_interval=0, tag_pattern=AllowDenyPattern( deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"] ), @@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny(): source_report = pipeline.source.get_report() assert isinstance(source_report, SnowflakeV2Report) assert source_report.tags_scanned == 5 - assert source_report._processed_tags == { + assert sorted(list(source_report._processed_tags)) == [ "snowflake.other_db.other_schema.my_other_tag", "snowflake.test_db.test_schema.security", - } + ]