diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 93f6db2c5feb53..a970a49bf0aaa0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -441,9 +441,6 @@ def get_workunits(self) -> Iterable[WorkUnit]: logger.info(f"Processing project: {project_id.id}") yield from self._process_project(conn, project_id) - if self.config.include_usage_statistics: - yield from self.usage_extractor.get_workunits() - if self.config.profiling.enabled: yield from self.profiler.get_workunits(self.db_tables) @@ -489,7 +486,26 @@ def _process_project( if self.config.include_usage_statistics: logger.info(f"Generate usage for {project_id}") - yield from self.usage_extractor.generate_usage_for_project(project_id) + tables: Dict[str, List[str]] = {} + + for dataset in self.db_tables[project_id]: + tables[dataset] = [ + table.name for table in self.db_tables[project_id][dataset] + ] + + for dataset in self.db_views[project_id]: + if not tables[dataset]: + tables[dataset] = [ + table.name for table in self.db_views[project_id][dataset] + ] + else: + tables[dataset].extend( + [table.name for table in self.db_views[project_id][dataset]] + ) + + yield from self.usage_extractor.generate_usage_for_project( + project_id, tables + ) def _process_schema( self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index cfd25bc5319b3d..119d8fa49b171d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -150,15 +150,10 @@ class BigQueryUsageExtractor: * Aggregation of these statistics into buckets, by day or hour granularity :::note - 1. This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` plugin. - 2. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission. + 1. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission. ::: """ - aggregated_info: Dict[ - datetime, Dict[BigQueryTableRef, AggregatedDataset] - ] = collections.defaultdict(dict) - def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report): self.config: BigQueryV2Config = config self.report: BigQueryV2Report = report @@ -173,7 +168,13 @@ def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool: and self.config.table_pattern.allowed(table_ref.table_identifier.table) ) - def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUnit]: + def generate_usage_for_project( + self, project_id: str, tables: Dict[str, List[str]] + ) -> Iterable[MetadataWorkUnit]: + aggregated_info: Dict[ + datetime, Dict[BigQueryTableRef, AggregatedDataset] + ] = collections.defaultdict(dict) + parsed_bigquery_log_events: Iterable[ Union[ReadEvent, QueryEvent, MetadataWorkUnit] ] @@ -221,24 +222,26 @@ def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUn yield operational_wu self.report.num_operational_stats_workunits_emitted += 1 if event.read_event: - self.aggregated_info = self._aggregate_enriched_read_events( - self.aggregated_info, event + aggregated_info = self._aggregate_enriched_read_events( + aggregated_info, event, tables ) num_aggregated += 1 logger.info(f"Total number of events aggregated = {num_aggregated}.") bucket_level_stats: str = "\n\t" + "\n\t".join( [ f'bucket:{db.strftime("%m-%d-%Y:%H:%M:%S")}, size={len(ads)}' - for db, ads in self.aggregated_info.items() + for db, ads in aggregated_info.items() ] ) logger.debug( - f"Number of buckets created = {len(self.aggregated_info)}. Per-bucket details:{bucket_level_stats}" + f"Number of buckets created = {len(aggregated_info)}. Per-bucket details:{bucket_level_stats}" ) self.report.usage_extraction_sec[project_id] = round( timer.elapsed_seconds(), 2 ) + + yield from self.get_workunits(aggregated_info) except Exception as e: self.report.usage_failed_extraction.append(project_id) logger.error( @@ -746,6 +749,7 @@ def _aggregate_enriched_read_events( self, datasets: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]], event: AuditEvent, + tables: Dict[str, List[str]], ) -> Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]]: if not event.read_event: return datasets @@ -756,6 +760,12 @@ def _aggregate_enriched_read_events( resource: Optional[BigQueryTableRef] = None try: resource = event.read_event.resource.get_sanitized_table_ref() + if ( + resource.table_identifier.get_table_display_name() + not in tables[resource.table_identifier.dataset] + ): + logger.debug(f"Skipping non existing {resource} from usage") + return datasets except Exception as e: self.report.report_warning( str(event.read_event.resource), f"Failed to clean up resource, {e}" @@ -787,9 +797,11 @@ def _aggregate_enriched_read_events( return datasets - def get_workunits(self): + def get_workunits( + self, aggregated_info: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]] + ) -> Iterable[MetadataWorkUnit]: self.report.num_usage_workunits_emitted = 0 - for time_bucket in self.aggregated_info.values(): + for time_bucket in aggregated_info.values(): for aggregate in time_bucket.values(): wu = self._make_usage_stat(aggregate) self.report.report_workunit(wu)