Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): bigquery-beta - Lowering a bit memory footprint of bigquery usage #6095

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,6 @@ def get_workunits(self) -> Iterable[WorkUnit]:
logger.info(f"Processing project: {project_id.id}")
yield from self._process_project(conn, project_id)

if self.config.include_usage_statistics:
yield from self.usage_extractor.get_workunits()

if self.config.profiling.enabled:
yield from self.profiler.get_workunits(self.db_tables)

Expand Down Expand Up @@ -489,7 +486,26 @@ def _process_project(

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
yield from self.usage_extractor.generate_usage_for_project(project_id)
tables: Dict[str, List[str]] = {}

for dataset in self.db_tables[project_id]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
]

for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_views[project_id][dataset]
]
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)

yield from self.usage_extractor.generate_usage_for_project(
project_id, tables
)

def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,10 @@ class BigQueryUsageExtractor:
* Aggregation of these statistics into buckets, by day or hour granularity

:::note
1. This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` plugin.
2. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission.
1. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission.
:::
"""

aggregated_info: Dict[
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiousity, what was the reason for moving this into generate_usage_for_project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I wanted to make sure don't keep in memory this dict longer than we should and making sure if we run the extract usage multiple times then it won't add others project usage to the same dict.

Copy link
Collaborator

@hsheth2 hsheth2 Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - feel free to merge whenever

datetime, Dict[BigQueryTableRef, AggregatedDataset]
] = collections.defaultdict(dict)

def __init__(self, config: BigQueryV2Config, report: BigQueryV2Report):
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = report
Expand All @@ -173,7 +168,13 @@ def _is_table_allowed(self, table_ref: Optional[BigQueryTableRef]) -> bool:
and self.config.table_pattern.allowed(table_ref.table_identifier.table)
)

def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUnit]:
def generate_usage_for_project(
self, project_id: str, tables: Dict[str, List[str]]
) -> Iterable[MetadataWorkUnit]:
aggregated_info: Dict[
datetime, Dict[BigQueryTableRef, AggregatedDataset]
] = collections.defaultdict(dict)

parsed_bigquery_log_events: Iterable[
Union[ReadEvent, QueryEvent, MetadataWorkUnit]
]
Expand Down Expand Up @@ -221,24 +222,26 @@ def generate_usage_for_project(self, project_id: str) -> Iterable[MetadataWorkUn
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
if event.read_event:
self.aggregated_info = self._aggregate_enriched_read_events(
self.aggregated_info, event
aggregated_info = self._aggregate_enriched_read_events(
aggregated_info, event, tables
)
num_aggregated += 1
logger.info(f"Total number of events aggregated = {num_aggregated}.")
bucket_level_stats: str = "\n\t" + "\n\t".join(
[
f'bucket:{db.strftime("%m-%d-%Y:%H:%M:%S")}, size={len(ads)}'
for db, ads in self.aggregated_info.items()
for db, ads in aggregated_info.items()
]
)
logger.debug(
f"Number of buckets created = {len(self.aggregated_info)}. Per-bucket details:{bucket_level_stats}"
f"Number of buckets created = {len(aggregated_info)}. Per-bucket details:{bucket_level_stats}"
)

self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
)

yield from self.get_workunits(aggregated_info)
except Exception as e:
self.report.usage_failed_extraction.append(project_id)
logger.error(
Expand Down Expand Up @@ -746,6 +749,7 @@ def _aggregate_enriched_read_events(
self,
datasets: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]],
event: AuditEvent,
tables: Dict[str, List[str]],
) -> Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]]:
if not event.read_event:
return datasets
Expand All @@ -756,6 +760,12 @@ def _aggregate_enriched_read_events(
resource: Optional[BigQueryTableRef] = None
try:
resource = event.read_event.resource.get_sanitized_table_ref()
if (
resource.table_identifier.get_table_display_name()
not in tables[resource.table_identifier.dataset]
):
logger.debug(f"Skipping non existing {resource} from usage")
return datasets
except Exception as e:
self.report.report_warning(
str(event.read_event.resource), f"Failed to clean up resource, {e}"
Expand Down Expand Up @@ -787,9 +797,11 @@ def _aggregate_enriched_read_events(

return datasets

def get_workunits(self):
def get_workunits(
self, aggregated_info: Dict[datetime, Dict[BigQueryTableRef, AggregatedDataset]]
) -> Iterable[MetadataWorkUnit]:
self.report.num_usage_workunits_emitted = 0
for time_bucket in self.aggregated_info.values():
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
Expand Down