Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): minor changes in snowflake-beta source, add basic tests #5910

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ def _get_columns_to_profile(self) -> List[str]:
columns_to_profile = columns_to_profile[
: self.config.max_number_of_fields_to_profile
]

self.report.report_dropped(
f"The max_number_of_fields_to_profile={self.config.max_number_of_fields_to_profile} reached. Profile of columns {self.dataset_name}({', '.join(sorted(columns_being_dropped))})"
)
if self.config.report_dropped_profiles:
self.report.report_dropped(
f"The max_number_of_fields_to_profile={self.config.max_number_of_fields_to_profile} reached. Profile of columns {self.dataset_name}({', '.join(sorted(columns_being_dropped))})"
)
return columns_to_profile

@_run_with_query_combiner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class GEProfilingConfig(ConfigModel):
)
report_dropped_profiles: bool = Field(
default=False,
description="If datasets which were not profiled are reported in source report or not. Set to `True` for debugging purposes.",
description="Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
)

# These settings will override the ones below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
platform=self.platform,
profiler_args=self.get_profile_args(),
):
profile.sizeInBytes = request.table.size_in_bytes # type:ignore
if profile is None:
continue
profile.sizeInBytes = cast(
SnowflakeProfilerRequest, request
).table.size_in_bytes
dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
Expand Down Expand Up @@ -153,14 +155,18 @@ def is_dataset_eligible_for_profiling(
size_in_bytes is not None
and size_in_bytes / (2**30)
<= self.config.profiling.profile_table_size_limit
) # Note: Profiling is not allowed is size_in_bytes is not available
)
# Note: Profiling is not allowed is size_in_bytes is not available
# and self.config.profiling.profile_table_size_limit is set
)
and (
self.config.profiling.profile_table_row_limit is None
or (
rows_count is not None
and rows_count <= self.config.profiling.profile_table_row_limit
) # Note: Profiling is not allowed is rows_count is not available
)
# Note: Profiling is not allowed is rows_count is not available
# and self.config.profiling.profile_table_row_limit is set
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@


class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport):

schemas_scanned: int = 0
databases_scanned: int = 0

include_usage_stats: bool = False
include_operational_stats: bool = False
include_technical_schema: bool = False
Expand All @@ -21,3 +25,18 @@ class SnowflakeV2Report(SnowflakeReport, SnowflakeUsageReport):
num_get_columns_for_table_queries: int = 0

rows_zero_objects_modified: int = 0

def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
"""
Entity could be a view or a table or a schema or a database
"""
if ent_type == "table":
self.tables_scanned += 1
elif ent_type == "view":
self.views_scanned += 1
elif ent_type == "schema":
self.schemas_scanned += 1
elif ent_type == "database":
self.databases_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SnowflakeColumn:
ordinal_position: int
is_nullable: bool
data_type: str
comment: str
comment: Optional[str]


@dataclass
Expand All @@ -43,7 +43,7 @@ class SnowflakeTable:
last_altered: datetime
size_in_bytes: int
rows_count: int
comment: str
comment: Optional[str]
clustering_key: str
pk: Optional[SnowflakePK] = None
columns: List[SnowflakeColumn] = field(default_factory=list)
Expand All @@ -65,7 +65,7 @@ class SnowflakeSchema:
name: str
created: datetime
last_altered: datetime
comment: str
comment: Optional[str]
tables: List[SnowflakeTable] = field(default_factory=list)
views: List[SnowflakeView] = field(default_factory=list)

Expand All @@ -74,7 +74,7 @@ class SnowflakeSchema:
class SnowflakeDatabase:
name: str
created: datetime
comment: str
comment: Optional[str]
schemas: List[SnowflakeSchema] = field(default_factory=list)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,12 @@ def _map_top_sql_queries(self, top_sql_queries: Dict) -> List[str]:
def _map_user_counts(self, user_counts: Dict) -> List[DatasetUserUsageCounts]:
filtered_user_counts = []
for user_count in user_counts:
user_email = user_count.get(
"email",
"{0}@{1}".format(
user_email = user_count.get("email")
if not user_email and self.config.email_domain and user_count["user_name"]:
user_email = "{0}@{1}".format(
user_count["user_name"], self.config.email_domain
).lower()
if self.config.email_domain
else None,
)
if user_email is None or not self.config.user_email_pattern.allowed(
user_email
):
if not user_email or not self.config.user_email_pattern.allowed(user_email):
continue

filtered_user_counts.append(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from typing import Any, Optional, Protocol
from typing import Any, Optional

from snowflake.connector import SnowflakeConnection
from snowflake.connector.cursor import DictCursor
from typing_extensions import Protocol

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
Expand Down Expand Up @@ -124,7 +125,7 @@ def get_dataset_identifier_from_qualified_name(
def get_user_identifier(
self: SnowflakeCommonProtocol, user_name: str, user_email: Optional[str]
) -> str:
if user_email is not None:
if user_email:
return user_email.split("@")[0]
return self.snowflake_identifier(user_name)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ def get_workunits(self) -> Iterable[WorkUnit]:
conn
)
for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")

if not self.config.database_pattern.allowed(snowflake_db.name):
self.report.report_dropped(f"{snowflake_db.name}.*")
continue
Expand Down Expand Up @@ -463,6 +465,8 @@ def _process_database(

for snowflake_schema in snowflake_db.schemas:

self.report.report_entity_scanned(snowflake_schema.name, "schema")

if not self.config.schema_pattern.allowed(snowflake_schema.name):
self.report.report_dropped(f"{db_name}.{snowflake_schema.name}.*")
continue
Expand Down
Loading