diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index d50c3039c6082..61800ae912b32 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -55,6 +55,7 @@ BigqueryTable, BigqueryView, ) +from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor @@ -228,10 +229,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigqueryV2Source": config = BigQueryV2Config.parse_obj(config_dict) return cls(ctx, config) - def get_bigquery_client(self) -> bigquery.Client: - client_options = self.config.extra_client_options - return bigquery.Client(**client_options) - @staticmethod def connectivity_test(client: bigquery.Client) -> CapabilityReport: ret = client.query("select 1") @@ -244,12 +241,12 @@ def connectivity_test(client: bigquery.Client) -> CapabilityReport: @staticmethod def metada_read_capability_test( - project_ids: List[str], profiling_enabled: bool + project_ids: List[str], config: BigQueryV2Config ) -> CapabilityReport: for project_id in project_ids: try: logger.info((f"Metadata read capability test for project {project_id}")) - client: bigquery.Client = bigquery.Client(project_id) + client: bigquery.Client = get_bigquery_client(config) assert client result = BigQueryDataDictionary.get_datasets_for_project_id( client, project_id, 10 @@ -264,7 +261,7 @@ def metada_read_capability_test( project_id=project_id, dataset_name=result[0].name, tables={}, - with_data_read_permission=profiling_enabled, + with_data_read_permission=config.profiling.enabled, ) if len(tables) == 0: return CapabilityReport( @@ -333,7 +330,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport: pydantic.Extra.allow ) # we are okay with extra fields during this stage connection_conf = BigQueryV2Config.parse_obj(config_dict) - client: bigquery.Client = bigquery.Client() + client: bigquery.Client = get_bigquery_client(connection_conf) assert client test_report.basic_connectivity = BigqueryV2Source.connectivity_test(client) @@ -350,7 +347,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport: project_ids.append(project.project_id) metada_read_capability = BigqueryV2Source.metada_read_capability_test( - project_ids, connection_conf.profiling.enabled + project_ids, connection_conf ) if SourceCapability.SCHEMA_METADATA not in _report: _report[SourceCapability.SCHEMA_METADATA] = metada_read_capability @@ -493,7 +490,7 @@ def add_table_to_dataset_container( def get_workunits(self) -> Iterable[WorkUnit]: logger.info("Getting projects") - conn: bigquery.Client = self.get_bigquery_client() + conn: bigquery.Client = get_bigquery_client(self.config) self.add_config_to_report() projects: List[BigqueryProject] @@ -503,12 +500,26 @@ def get_workunits(self) -> Iterable[WorkUnit]: ) projects = [project] else: - projects = BigQueryDataDictionary.get_projects(conn) - if len(projects) == 0: - logger.warning( - "Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account." + try: + projects = BigQueryDataDictionary.get_projects(conn) + if len(projects) == 0: + logger.error( + "Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account." + ) + self.report.report_failure( + "metadata-extraction", + "Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account.", + ) + return + except Exception as e: + logger.error( + f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}" + ) + self.report.report_failure( + "metadata-extraction", + f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}", ) - return + return None for project_id in projects: if not self.config.project_id_pattern.allowed(project_id.id): @@ -543,8 +554,13 @@ def _process_project( BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id) ) except Exception as e: - logger.error( - f"Unable to get datasets for project {project_id}, skipping. The error was: {e}" + error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}" + if self.config.profiling.enabled: + error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}" + logger.error(error_message) + self.report.report_failure( + "metadata-extraction", + f"{project_id} - {error_message}", ) return None @@ -565,60 +581,66 @@ def _process_project( try: yield from self._process_schema(conn, project_id, bigquery_dataset) except Exception as e: + error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}" + if self.config.profiling.enabled: + error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission, bigquery.tables.getData permission? The error was: {e}" + trace = traceback.format_exc() logger.error(trace) - logger.error( - f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. The error was: {e}" + logger.error(error_message) + self.report.report_failure( + "metadata-extraction", + f"{project_id}.{bigquery_dataset.name} - {error_message}", ) continue if self.config.include_table_lineage: - logger.info(f"Generate lineage for {project_id}") - for dataset in self.db_tables[project_id]: - for table in self.db_tables[project_id][dataset]: - dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) - lineage_info = self.lineage_extractor.get_upstream_lineage_info( - project_id=project_id, - dataset_name=dataset, - table=table, - platform=self.platform, - ) - if lineage_info: - yield from self.gen_lineage(dataset_urn, lineage_info) - - for dataset in self.db_views[project_id]: - for view in self.db_views[project_id][dataset]: - dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name) - lineage_info = self.lineage_extractor.get_upstream_lineage_info( - project_id=project_id, - dataset_name=dataset, - table=view, - platform=self.platform, - ) - yield from self.gen_lineage(dataset_urn, lineage_info) + yield from self.generate_lineage(project_id) if self.config.include_usage_statistics: - logger.info(f"Generate usage for {project_id}") - tables: Dict[str, List[str]] = {} - - for dataset in self.db_tables[project_id]: + yield from self.generate_usage_statistics(project_id) + + def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]: + logger.info(f"Generate lineage for {project_id}") + for dataset in self.db_tables[project_id]: + for table in self.db_tables[project_id][dataset]: + dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name) + lineage_info = self.lineage_extractor.get_upstream_lineage_info( + project_id=project_id, + dataset_name=dataset, + table=table, + platform=self.platform, + ) + if lineage_info: + yield from self.gen_lineage(dataset_urn, lineage_info) + for dataset in self.db_views[project_id]: + for view in self.db_views[project_id][dataset]: + dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name) + lineage_info = self.lineage_extractor.get_upstream_lineage_info( + project_id=project_id, + dataset_name=dataset, + table=view, + platform=self.platform, + ) + yield from self.gen_lineage(dataset_urn, lineage_info) + + def generate_usage_statistics(self, project_id: str) -> Iterable[MetadataWorkUnit]: + logger.info(f"Generate usage for {project_id}") + tables: Dict[str, List[str]] = {} + for dataset in self.db_tables[project_id]: + tables[dataset] = [ + table.name for table in self.db_tables[project_id][dataset] + ] + for dataset in self.db_views[project_id]: + if not tables[dataset]: tables[dataset] = [ - table.name for table in self.db_tables[project_id][dataset] + table.name for table in self.db_views[project_id][dataset] ] - - for dataset in self.db_views[project_id]: - if not tables[dataset]: - tables[dataset] = [ - table.name for table in self.db_views[project_id][dataset] - ] - else: - tables[dataset].extend( - [table.name for table in self.db_views[project_id][dataset]] - ) - - yield from self.usage_extractor.generate_usage_for_project( - project_id, tables - ) + else: + tables[dataset].extend( + [table.name for table in self.db_views[project_id][dataset]] + ) + yield from self.usage_extractor.generate_usage_for_project(project_id, tables) def _process_schema( self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 91e0dbbe925fa..94117f26ff794 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -66,6 +66,12 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig): default=None, description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account", ) + + project_on_behalf: Optional[str] = Field( + default=None, + description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account..", + ) + storage_project_id: None = Field(default=None, hidden_from_schema=True) lineage_use_sql_parser: bool = Field( @@ -126,3 +132,12 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict: def get_table_pattern(self, pattern: List[str]) -> str: return "|".join(pattern) if self.table_pattern else "" + + # TODO: remove run_on_compute when the legacy bigquery source will be deprecated + def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str: + if self.project_on_behalf: + return f"bigquery://{self.project_on_behalf}" + # When project_id is not set, we will attempt to detect the project ID + # based on the credentials or environment variables. + # See https://github.com/mxmzdlv/pybigquery#authentication. + return "bigquery://" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py index b85f92bf2a8f5..8a00f8f1d5fe4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/common.py @@ -1,7 +1,10 @@ from typing import Any, Dict, Optional +from google.cloud import bigquery from google.cloud.logging_v2.client import Client as GCPLoggingClient +from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config + BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" BQ_DATE_SHARD_FORMAT = "%Y%m%d" @@ -17,3 +20,17 @@ def _make_gcp_logging_client( return GCPLoggingClient(**client_options, project=project_id) else: return GCPLoggingClient(**client_options) + + +def get_bigquery_client(config: BigQueryV2Config) -> bigquery.Client: + client_options = config.extra_client_options + return bigquery.Client(config.project_on_behalf, **client_options) + + +def get_sql_alchemy_url(config: BigQueryV2Config) -> str: + if config.project_on_behalf: + return f"bigquery://{config.project_on_behalf}" + # When project_id is not set, we will attempt to detect the project ID + # based on the credentials or environment variables. + # See https://github.com/mxmzdlv/pybigquery#authentication. + return "bigquery://" diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 22dff7a9d2cd7..905250c91c9c9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -275,7 +275,8 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata( e, ) self.report.report_failure( - f"{client.project}", f"unable to retrieve log entries {e}" + "lineage-extraction", + f"{client.project} - unable to retrieve log entries {e}", ) def _get_exported_bigquery_audit_metadata( @@ -367,7 +368,8 @@ def _get_bigquery_log_entries_via_gcp_logging( e, ) self.report.report_failure( - f"{client.project}", f"unable to retrive log entrires {e}" + "usage-extraction", + f"{client.project} - unable to retrive log entrires {e}", ) def _generate_filter(self, audit_templates: Dict[str, str]) -> str: @@ -622,10 +624,8 @@ def _parse_bigquery_log_entries( self.report.num_query_events += 1 if event is None: - self.error( - logger, - f"{entry.log_name}-{entry.insert_id}", - f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}", + logger.warning( + f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}" ) else: yield event @@ -664,10 +664,8 @@ def _parse_exported_bigquery_audit_metadata( else: self.error( logger, - f"{audit_metadata['logName']}-{audit_metadata['insertId']}", - f"Unable to parse audit metadata missing " - f"QueryEvent keys:{str(missing_query_event_exported_audit)}," - f" ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}", + "usage-extraction", + f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}", ) def error(self, log: logging.Logger, key: str, reason: str) -> Any: