Skip to content

Commit

Permalink
fix(ingest): ignore usage and operation for snowflake datasets withou…
Browse files Browse the repository at this point in the history
…t schema (#6112)
  • Loading branch information
mayurinehate authored Oct 5, 2022
1 parent 2f79b50 commit 4ee3ef1
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
)
include_table_lineage = values.get("include_table_lineage")

# TODO: Allow lineage extraction irrespective of basic schema extraction,
# TODO: Allow lineage extraction and profiling irrespective of basic schema extraction,
# as it seems possible with some refractor
if not include_technical_schema and any(
[include_profiles, delete_detection_enabled, include_table_lineage]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
"max_overflow", self.config.profiling.max_workers
)

# Otherwise, if column level profiling is enabled, use GE profiler.
for db in databases:
if not self.config.database_pattern.allowed(db.name):
continue
Expand Down Expand Up @@ -236,6 +235,7 @@ def generate_profiles(
if len(ge_profile_requests) == 0:
return

# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
yield from ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None
self.report: SnowflakeV2Report = report
self.logger = logger

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
conn = self.config.get_connection()

logger.info("Checking usage date ranges")
Expand All @@ -107,18 +109,20 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Now, we report the usage as well as operation metadata even if user email is absent

if self.config.include_usage_stats:
yield from self.get_usage_workunits(conn)
yield from self.get_usage_workunits(conn, discovered_datasets)

if self.config.include_operational_stats:
# Generate the operation workunits.
access_events = self._get_snowflake_history(conn)
for event in access_events:
yield from self._get_operation_aspect_work_unit(event)
yield from self._get_operation_aspect_work_unit(
event, discovered_datasets
)

conn.close()

def get_usage_workunits(
self, conn: SnowflakeConnection
self, conn: SnowflakeConnection, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

with PerfTimer() as timer:
Expand All @@ -144,6 +148,15 @@ def get_usage_workunits(
):
continue

dataset_identifier = self.get_dataset_identifier_from_qualified_name(
row["OBJECT_NAME"]
)
if dataset_identifier not in discovered_datasets:
logger.debug(
f"Skipping usage for table {dataset_identifier}, as table schema is not accessible"
)
continue

stats = DatasetUsageStatistics(
timestampMillis=int(row["BUCKET_START_TIME"].timestamp() * 1000),
eventGranularity=TimeWindowSize(
Expand All @@ -161,7 +174,7 @@ def get_usage_workunits(
)
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier_from_qualified_name(row["OBJECT_NAME"]),
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
Expand Down Expand Up @@ -276,7 +289,7 @@ def _check_usage_date_ranges(self, conn: SnowflakeConnection) -> Any:
)

def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
if event.query_start_time and event.query_type in OPERATION_STATEMENT_TYPES:
start_time = event.query_start_time
Expand All @@ -292,9 +305,20 @@ def _get_operation_aspect_work_unit(
for obj in event.objects_modified:

resource = obj.objectName

dataset_identifier = self.get_dataset_identifier_from_qualified_name(
resource
)

if dataset_identifier not in discovered_datasets:
logger.debug(
f"Skipping operations for table {dataset_identifier}, as table schema is not accessible"
)
continue

dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier_from_qualified_name(resource),
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,8 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
)

if self.config.include_technical_schema:
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary()
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary()

if config.include_table_lineage:
# For lineage
Expand Down Expand Up @@ -430,25 +429,24 @@ def get_workunits(self) -> Iterable[WorkUnit]:
self.inspect_session_metadata(conn)

self.report.include_technical_schema = self.config.include_technical_schema
if self.config.include_technical_schema:
databases: List[SnowflakeDatabase] = self.data_dictionary.get_databases(
conn
)
for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")
databases: List[SnowflakeDatabase] = []

databases = self.data_dictionary.get_databases(conn)
for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")

if not self.config.database_pattern.allowed(snowflake_db.name):
self.report.report_dropped(f"{snowflake_db.name}.*")
continue
if not self.config.database_pattern.allowed(snowflake_db.name):
self.report.report_dropped(f"{snowflake_db.name}.*")
continue

yield from self._process_database(conn, snowflake_db)
yield from self._process_database(conn, snowflake_db)

conn.close()
# Emit Stale entity workunits
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()
conn.close()
# Emit Stale entity workunits
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

if self.config.profiling.enabled and len(databases) != 0:
yield from self.profiler.get_workunits(databases)
if self.config.profiling.enabled and len(databases) != 0:
yield from self.profiler.get_workunits(databases)

if self.config.include_usage_stats or self.config.include_operational_stats:
if self.redundant_run_skip_handler.should_skip_this_run(
Expand All @@ -462,14 +460,27 @@ def get_workunits(self) -> Iterable[WorkUnit]:
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
yield from self.usage_extractor.get_workunits()

discovered_datasets: List[str] = [
self.get_dataset_identifier(table.name, schema.name, db.name)
for db in databases
for schema in db.schemas
for table in schema.tables
] + [
self.get_dataset_identifier(table.name, schema.name, db.name)
for db in databases
for schema in db.schemas
for table in schema.views
]
yield from self.usage_extractor.get_workunits(discovered_datasets)

def _process_database(
self, conn: SnowflakeConnection, snowflake_db: SnowflakeDatabase
) -> Iterable[MetadataWorkUnit]:
db_name = snowflake_db.name

yield from self.gen_database_containers(snowflake_db)
if self.config.include_technical_schema:
yield from self.gen_database_containers(snowflake_db)

# Use database and extract metadata from its information_schema
# If this query fails, it means, user does not have usage access on database
Expand Down Expand Up @@ -501,23 +512,26 @@ def _process_schema(
self, conn: SnowflakeConnection, snowflake_schema: SnowflakeSchema, db_name: str
) -> Iterable[MetadataWorkUnit]:
schema_name = snowflake_schema.name
yield from self.gen_schema_containers(snowflake_schema, db_name)
if self.config.include_technical_schema:
yield from self.gen_schema_containers(snowflake_schema, db_name)

if self.config.include_tables:
snowflake_schema.tables = self.get_tables_for_schema(
conn, schema_name, db_name
)

for table in snowflake_schema.tables:
yield from self._process_table(conn, table, schema_name, db_name)
if self.config.include_technical_schema:
for table in snowflake_schema.tables:
yield from self._process_table(conn, table, schema_name, db_name)

if self.config.include_views:
snowflake_schema.views = self.get_views_for_schema(
conn, schema_name, db_name
)

for view in snowflake_schema.views:
yield from self._process_view(conn, view, schema_name, db_name)
if self.config.include_technical_schema:
for view in snowflake_schema.views:
yield from self._process_view(conn, view, schema_name, db_name)

def _process_table(
self,
Expand Down

0 comments on commit 4ee3ef1

Please sign in to comment.