Skip to content

Commit

Permalink
fix(ingest/snowflake): Create all structured propery templates before…
Browse files Browse the repository at this point in the history
… assignation (datahub-project#12469)
  • Loading branch information
treff7es authored Jan 30, 2025
1 parent 7a32256 commit 4dc9bfc
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ class SnowflakeV2Config(
description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.",
)

structured_properties_template_cache_invalidation_interval: int = Field(
hidden_from_docs=True,
default=60,
description="Interval in seconds to invalidate the structured properties template cache.",
)

include_external_url: bool = Field(
default=True,
description="Whether to populate Snowsight url for Snowflake Objects",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
order by table_schema, table_name"""

@staticmethod
def get_all_tags():
return """
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
FROM snowflake.account_usage.tag_references
GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name
ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC;
"""

@staticmethod
def get_all_tags_on_object_with_propagation(
db_name: str, quoted_identifier: str, domain: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class SnowflakeV2Report(
num_tables_with_known_upstreams: int = 0
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0
num_structured_property_templates_created: int = 0

data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,23 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]:

return secure_view_definitions

def get_all_tags(self) -> List[SnowflakeTag]:
cur = self.connection.query(
SnowflakeQuery.get_all_tags(),
)

tags = [
SnowflakeTag(
database=tag["TAG_DATABASE"],
schema=tag["TAG_SCHEMA"],
name=tag["TAG_NAME"],
value="",
)
for tag in cur
]

return tags

@serialized_lru_cache(maxsize=1)
def get_tables_for_database(
self, db_name: str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import itertools
import logging
import time
from typing import Dict, Iterable, List, Optional, Union

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
get_sys_time,
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
Expand Down Expand Up @@ -74,7 +74,6 @@
PROFILING,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
GlobalTags,
Status,
SubTypes,
Expand All @@ -101,15 +100,8 @@
StringType,
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)
Expand Down Expand Up @@ -191,7 +183,7 @@ def __init__(
self.domain_registry: Optional[DomainRegistry] = domain_registry
self.classification_handler = ClassificationHandler(self.config, self.report)
self.tag_extractor = SnowflakeTagExtractor(
config, self.data_dictionary, self.report
config, self.data_dictionary, self.report, identifiers
)
self.profiler: Optional[SnowflakeProfiler] = profiler
self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = (
Expand All @@ -217,6 +209,16 @@ def snowflake_identifier(self, identifier: str) -> str:
return self.identifiers.snowflake_identifier(identifier)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.extract_tags_as_structured_properties:
logger.info("Creating structured property templates for tags")
yield from self.tag_extractor.create_structured_property_templates()
# We have to wait until cache invalidates to make sure the structured property template is available
logger.info(
f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate"
)
time.sleep(
self.config.structured_properties_template_cache_invalidation_interval
)
self.databases = []
for database in self.get_databases() or []:
self.report.report_entity_scanned(database.name, "database")
Expand Down Expand Up @@ -698,6 +700,7 @@ def _process_view(

def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
use_sp = self.config.extract_tags_as_structured_properties

identifier = (
self.snowflake_identifier(tag.structured_property_identifier())
if use_sp
Expand All @@ -708,10 +711,11 @@ def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
return

self.report.report_tag_processed(identifier)

if use_sp:
yield from self.gen_tag_as_structured_property_workunits(tag)
else:
yield from self.gen_tag_workunits(tag)
return

yield from self.gen_tag_workunits(tag)

def _format_tags_as_structured_properties(
self, tags: List[SnowflakeTag]
Expand All @@ -732,6 +736,7 @@ def gen_dataset_workunits(
if table.tags:
for tag in table.tags:
yield from self._process_tag(tag)

for column_name in table.column_tags:
for tag in table.column_tags[column_name]:
yield from self._process_tag(tag)
Expand Down Expand Up @@ -903,29 +908,6 @@ def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
entityUrn=tag_urn, aspect=tag_properties_aspect
).as_workunit()

def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifier(tag.structured_property_identifier())
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
).as_workunit()

def gen_column_tags_as_structured_properties(
self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView]
) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
from typing import Dict, List, Optional
from typing import Dict, Iterable, List, Optional

from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
Expand All @@ -12,7 +15,22 @@
SnowflakeTag,
_SnowflakeTagCache,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,11 +41,12 @@ def __init__(
config: SnowflakeV2Config,
data_dictionary: SnowflakeDataDictionary,
report: SnowflakeV2Report,
snowflake_identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.data_dictionary = data_dictionary
self.report = report

self.snowflake_identifiers = snowflake_identifiers
self.tag_cache: Dict[str, _SnowflakeTagCache] = {}

def _get_tags_on_object_without_propagation(
Expand Down Expand Up @@ -59,6 +78,41 @@ def _get_tags_on_object_without_propagation(
raise ValueError(f"Unknown domain {domain}")
return tags

def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
for tag in self.data_dictionary.get_all_tags():
if not self.config.structured_property_pattern.allowed(
tag.tag_identifier()
):
continue
if self.config.extract_tags_as_structured_properties:
self.report.num_structured_property_templates_created += 1
yield from self.gen_tag_as_structured_property_workunits(tag)

def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifiers.snowflake_identifier(
tag.structured_property_identifier()
)
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
).as_workunit()

def _get_tags_on_object_with_propagation(
self,
domain: str,
Expand Down
22 changes: 21 additions & 1 deletion metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,27 @@ def default_query_results( # noqa: C901
),
]:
return []

elif query == snowflake_query.SnowflakeQuery.get_all_tags():
return [
*[
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": f"my_tag_{ix}",
}
for ix in range(3)
],
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": "security",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
},
]
elif (
query
== snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties(
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
structured_properties_template_cache_invalidation_interval=0,
),
),
sink=DynamicTypedConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny():
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
extract_tags_as_structured_properties=True,
structured_properties_template_cache_invalidation_interval=0,
tag_pattern=AllowDenyPattern(
deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"]
),
Expand Down Expand Up @@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny():
source_report = pipeline.source.get_report()
assert isinstance(source_report, SnowflakeV2Report)
assert source_report.tags_scanned == 5
assert source_report._processed_tags == {
assert sorted(list(source_report._processed_tags)) == [
"snowflake.other_db.other_schema.my_other_tag",
"snowflake.test_db.test_schema.security",
}
]

0 comments on commit 4dc9bfc

Please sign in to comment.