From 835c0110020354d32b925b8c2221cc3e60ab3017 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 7 Jul 2021 17:18:38 -0700 Subject: [PATCH 1/2] fix(ingest): handle 'fields' list missing in bigquery-usage --- .../src/datahub/ingestion/source/bigquery_usage.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py index 3d01e23a1c6681..8676fe7c884deb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py @@ -143,8 +143,8 @@ def from_entry(cls, entry: AuditLogEntry) -> "ReadEvent": resourceName = entry.payload["resourceName"] readInfo = entry.payload["metadata"]["tableDataRead"] - fields = readInfo["fields"] readReason = readInfo["reason"] + fields = readInfo.get("fields", []) jobName = None if readReason == "JOB": jobName = readInfo["jobName"] @@ -284,7 +284,7 @@ def _parse_bigquery_log_entries( self, entries: Iterable[AuditLogEntry] ) -> Iterable[Union[ReadEvent, QueryEvent]]: for entry in entries: - event: Union[ReadEvent, QueryEvent] + event: Union[None, ReadEvent, QueryEvent] = None if ReadEvent.can_parse_entry(entry): event = ReadEvent.from_entry(entry) elif QueryEvent.can_parse_entry(entry): @@ -294,7 +294,8 @@ def _parse_bigquery_log_entries( f"{entry.log_name}-{entry.insert_id}", f"unable to parse log entry: {entry!r}", ) - yield event + if event: + yield event def _join_events_by_job_id( self, events: Iterable[Union[ReadEvent, QueryEvent]] From bd52367a2710e11904addb107600ef1a710b8919 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 8 Jul 2021 11:26:37 -0700 Subject: [PATCH 2/2] also handle reasons missing --- .../src/datahub/ingestion/source/bigquery_usage.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py index 8676fe7c884deb..6ec9700d810f17 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_usage.py @@ -120,7 +120,7 @@ class ReadEvent: resource: BigQueryTableRef fieldsRead: List[str] - readReason: str + readReason: Optional[str] jobName: Optional[str] payload: Any @@ -143,8 +143,8 @@ def from_entry(cls, entry: AuditLogEntry) -> "ReadEvent": resourceName = entry.payload["resourceName"] readInfo = entry.payload["metadata"]["tableDataRead"] - readReason = readInfo["reason"] fields = readInfo.get("fields", []) + readReason = readInfo.get("reason") jobName = None if readReason == "JOB": jobName = readInfo["jobName"]