Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): followup on bigquery queries v2 ordering #11353

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,32 +247,21 @@ def get_workunits_internal(
self.report.num_queries_by_project[project.id] += 1
queries.append(entry)
self.report.num_total_queries = len(queries)
logger.info(f"Found {self.report.num_total_queries} total queries")

with self.report.audit_log_preprocessing_timer:
# Preprocessing stage that deduplicates the queries using query hash per usage bucket
# Using regular dictionary with
# key: usage bucket
# value: File backed dictionary with query hash as key and observed query as value
# This structure is chosen in order to maintain order of execution of queries

queries_deduped: Dict[int, FileBackedDict[ObservedQuery]]
# Note: FileBackedDict is an ordered dictionary, so the order of execution of
# queries is inherently maintained
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]]
queries_deduped = self.deduplicate_queries(queries)
self.report.num_unique_queries = len(
set(
query_hash
for bucket in queries_deduped.values()
for query_hash in bucket
)
)
self.report.num_unique_queries = len(queries_deduped)
logger.info(f"Found {self.report.num_unique_queries} unique queries")

with self.report.audit_log_load_timer:
i = 0
for queries_in_bucket in queries_deduped.values():
# Ordering is essential for column-level lineage via temporary table
for row in queries_in_bucket.sql_query_iterator(
"select value from data order by last_query_timestamp asc",
):
query = queries_in_bucket.deserializer(row["value"])
for _, query_instances in queries_deduped.items():
for query in query_instances.values():
if i > 0 and i % 10000 == 0:
logger.info(f"Added {i} query log entries to SQL aggregator")

Expand All @@ -283,15 +272,15 @@ def get_workunits_internal(

def deduplicate_queries(
self, queries: FileBackedList[ObservedQuery]
) -> Dict[int, FileBackedDict[ObservedQuery]]:
) -> FileBackedDict[Dict[int, ObservedQuery]]:

# This fingerprint based deduplication is done here to reduce performance hit due to
# repetitive sql parsing while adding observed query to aggregator that would otherwise
# parse same query multiple times. In future, aggregator may absorb this deduplication.
# With current implementation, it is possible that "Operation"(e.g. INSERT) is reported
# only once per day, although it may have happened multiple times throughout the day.

queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict()
queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict()

for i, query in enumerate(queries):
if i > 0 and i % 10000 == 0:
Expand All @@ -310,20 +299,14 @@ def deduplicate_queries(
query.query, self.identifiers.platform, fast=True
)

if time_bucket not in queries_deduped:
# TODO: Cleanup, etc as required for file backed dicts after use
queries_deduped[time_bucket] = FileBackedDict[ObservedQuery](
extra_columns={"last_query_timestamp": lambda e: e.timestamp}
)
query_instances = queries_deduped.setdefault(query.query_hash, {})

observed_query = queries_deduped[time_bucket].get(query.query_hash)
observed_query = query_instances.setdefault(time_bucket, query)

# If the query already exists for this time bucket, update its attributes
if observed_query is not None:
if observed_query is not query:
observed_query.usage_multiplier += 1
observed_query.timestamp = query.timestamp
else:
queries_deduped[time_bucket][query.query_hash] = query

return queries_deduped

Expand Down
Loading