Skip to content

Commit

Permalink
fix(ingest): ignore usage and operation for snowflake datasets withou…
Browse files Browse the repository at this point in the history
…t schema

Currently if role has access to account usage tables, usage for all tables (including deleted
and not accessible tables) are ingested. This PR takes care not to ingest usage for
such datasets in datahub.
  • Loading branch information
mayurinehate committed Oct 4, 2022
1 parent bab5acc commit e7fd801
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def validate_unsupported_configs(cls, values: Dict) -> Dict:
)
include_table_lineage = values.get("include_table_lineage")

# TODO: Allow lineage extraction irrespective of basic schema extraction,
# TODO: Allow lineage extraction and profiling irrespective of basic schema extraction,
# as it seems possible with some refractor
if not include_technical_schema and any(
[include_profiles, delete_detection_enabled, include_table_lineage]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit
"max_overflow", self.config.profiling.max_workers
)

# Otherwise, if column level profiling is enabled, use GE profiler.
for db in databases:
if not self.config.database_pattern.allowed(db.name):
continue
Expand Down Expand Up @@ -236,6 +235,7 @@ def generate_profiles(
if len(ge_profile_requests) == 0:
return

# Otherwise, if column level profiling is enabled, use GE profiler.
ge_profiler = self.get_profiler_instance(db_name)
yield from ge_profiler.generate_profiles(
ge_profile_requests, max_workers, platform, profiler_args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ def __init__(self, config: SnowflakeV2Config, report: SnowflakeV2Report) -> None
self.report: SnowflakeV2Report = report
self.logger = logger

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_workunits(
self, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
conn = self.config.get_connection()

logger.info("Checking usage date ranges")
Expand All @@ -107,18 +109,20 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Now, we report the usage as well as operation metadata even if user email is absent

if self.config.include_usage_stats:
yield from self.get_usage_workunits(conn)
yield from self.get_usage_workunits(conn, discovered_datasets)

if self.config.include_operational_stats:
# Generate the operation workunits.
access_events = self._get_snowflake_history(conn)
for event in access_events:
yield from self._get_operation_aspect_work_unit(event)
yield from self._get_operation_aspect_work_unit(
event, discovered_datasets
)

conn.close()

def get_usage_workunits(
self, conn: SnowflakeConnection
self, conn: SnowflakeConnection, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:

with PerfTimer() as timer:
Expand All @@ -144,6 +148,15 @@ def get_usage_workunits(
):
continue

dataset_identifier = self.get_dataset_identifier_from_qualified_name(
row["OBJECT_NAME"]
)
if dataset_identifier not in discovered_datasets:
logger.debug(
f"Skipping usage for table {dataset_identifier}, as table schema is not accessible"
)
continue

stats = DatasetUsageStatistics(
timestampMillis=int(row["BUCKET_START_TIME"].timestamp() * 1000),
eventGranularity=TimeWindowSize(
Expand All @@ -161,7 +174,7 @@ def get_usage_workunits(
)
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier_from_qualified_name(row["OBJECT_NAME"]),
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
Expand Down Expand Up @@ -276,7 +289,7 @@ def _check_usage_date_ranges(self, conn: SnowflakeConnection) -> Any:
)

def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent
self, event: SnowflakeJoinedAccessEvent, discovered_datasets: List[str]
) -> Iterable[MetadataWorkUnit]:
if event.query_start_time and event.query_type in OPERATION_STATEMENT_TYPES:
start_time = event.query_start_time
Expand All @@ -292,9 +305,20 @@ def _get_operation_aspect_work_unit(
for obj in event.objects_modified:

resource = obj.objectName

dataset_identifier = self.get_dataset_identifier_from_qualified_name(
resource
)

if dataset_identifier not in discovered_datasets:
logger.debug(
f"Skipping operations for table {dataset_identifier}, as table schema is not accessible"
)
continue

dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
self.get_dataset_identifier_from_qualified_name(resource),
dataset_identifier,
self.config.platform_instance,
self.config.env,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,8 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
)

if self.config.include_technical_schema:
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary()
# For database, schema, tables, views, etc
self.data_dictionary = SnowflakeDataDictionary()

if config.include_table_lineage:
# For lineage
Expand Down Expand Up @@ -430,25 +429,24 @@ def get_workunits(self) -> Iterable[WorkUnit]:
self.inspect_session_metadata(conn)

self.report.include_technical_schema = self.config.include_technical_schema
if self.config.include_technical_schema:
databases: List[SnowflakeDatabase] = self.data_dictionary.get_databases(
conn
)
for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")
databases: List[SnowflakeDatabase] = []

databases = self.data_dictionary.get_databases(conn)
for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")

if not self.config.database_pattern.allowed(snowflake_db.name):
self.report.report_dropped(f"{snowflake_db.name}.*")
continue
if not self.config.database_pattern.allowed(snowflake_db.name):
self.report.report_dropped(f"{snowflake_db.name}.*")
continue

yield from self._process_database(conn, snowflake_db)
yield from self._process_database(conn, snowflake_db)

conn.close()
# Emit Stale entity workunits
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()
conn.close()
# Emit Stale entity workunits
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

if self.config.profiling.enabled and len(databases) != 0:
yield from self.profiler.get_workunits(databases)
if self.config.profiling.enabled and len(databases) != 0:
yield from self.profiler.get_workunits(databases)

if self.config.include_usage_stats or self.config.include_operational_stats:
if self.redundant_run_skip_handler.should_skip_this_run(
Expand All @@ -462,14 +460,27 @@ def get_workunits(self) -> Iterable[WorkUnit]:
start_time_millis=datetime_to_ts_millis(self.config.start_time),
end_time_millis=datetime_to_ts_millis(self.config.end_time),
)
yield from self.usage_extractor.get_workunits()

discovered_datasets: List[str] = [
self.get_dataset_identifier(table.name, schema.name, db.name)
for db in databases
for schema in db.schemas
for table in schema.tables
] + [
self.get_dataset_identifier(table.name, schema.name, db.name)
for db in databases
for schema in db.schemas
for table in schema.views
]
yield from self.usage_extractor.get_workunits(discovered_datasets)

def _process_database(
self, conn: SnowflakeConnection, snowflake_db: SnowflakeDatabase
) -> Iterable[MetadataWorkUnit]:
db_name = snowflake_db.name

yield from self.gen_database_containers(snowflake_db)
if self.config.include_technical_schema:
yield from self.gen_database_containers(snowflake_db)

# Use database and extract metadata from its information_schema
# If this query fails, it means, user does not have usage access on database
Expand Down Expand Up @@ -501,23 +512,26 @@ def _process_schema(
self, conn: SnowflakeConnection, snowflake_schema: SnowflakeSchema, db_name: str
) -> Iterable[MetadataWorkUnit]:
schema_name = snowflake_schema.name
yield from self.gen_schema_containers(snowflake_schema, db_name)
if self.config.include_technical_schema:
yield from self.gen_schema_containers(snowflake_schema, db_name)

if self.config.include_tables:
snowflake_schema.tables = self.get_tables_for_schema(
conn, schema_name, db_name
)

for table in snowflake_schema.tables:
yield from self._process_table(conn, table, schema_name, db_name)
if self.config.include_technical_schema:
for table in snowflake_schema.tables:
yield from self._process_table(conn, table, schema_name, db_name)

if self.config.include_views:
snowflake_schema.views = self.get_views_for_schema(
conn, schema_name, db_name
)

for view in snowflake_schema.views:
yield from self._process_view(conn, view, schema_name, db_name)
if self.config.include_technical_schema:
for view in snowflake_schema.views:
yield from self._process_view(conn, view, schema_name, db_name)

def _process_table(
self,
Expand Down

0 comments on commit e7fd801

Please sign in to comment.