Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery - option to set on behalf project #6660

Merged
merged 5 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
Expand Down Expand Up @@ -228,10 +229,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigqueryV2Source":
config = BigQueryV2Config.parse_obj(config_dict)
return cls(ctx, config)

def get_bigquery_client(self) -> bigquery.Client:
client_options = self.config.extra_client_options
return bigquery.Client(**client_options)

@staticmethod
def connectivity_test(client: bigquery.Client) -> CapabilityReport:
ret = client.query("select 1")
Expand All @@ -244,12 +241,12 @@ def connectivity_test(client: bigquery.Client) -> CapabilityReport:

@staticmethod
def metada_read_capability_test(
project_ids: List[str], profiling_enabled: bool
project_ids: List[str], config: BigQueryV2Config
) -> CapabilityReport:
for project_id in project_ids:
try:
logger.info((f"Metadata read capability test for project {project_id}"))
client: bigquery.Client = bigquery.Client(project_id)
client: bigquery.Client = get_bigquery_client(config)
assert client
result = BigQueryDataDictionary.get_datasets_for_project_id(
client, project_id, 10
Expand All @@ -264,7 +261,7 @@ def metada_read_capability_test(
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=profiling_enabled,
with_data_read_permission=config.profiling.enabled,
)
if len(tables) == 0:
return CapabilityReport(
Expand Down Expand Up @@ -333,7 +330,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
pydantic.Extra.allow
) # we are okay with extra fields during this stage
connection_conf = BigQueryV2Config.parse_obj(config_dict)
client: bigquery.Client = bigquery.Client()
client: bigquery.Client = get_bigquery_client(connection_conf)
assert client

test_report.basic_connectivity = BigqueryV2Source.connectivity_test(client)
Expand All @@ -350,7 +347,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
project_ids.append(project.project_id)

metada_read_capability = BigqueryV2Source.metada_read_capability_test(
project_ids, connection_conf.profiling.enabled
project_ids, connection_conf
)
if SourceCapability.SCHEMA_METADATA not in _report:
_report[SourceCapability.SCHEMA_METADATA] = metada_read_capability
Expand Down Expand Up @@ -493,7 +490,7 @@ def add_table_to_dataset_container(

def get_workunits(self) -> Iterable[WorkUnit]:
logger.info("Getting projects")
conn: bigquery.Client = self.get_bigquery_client()
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()

projects: List[BigqueryProject]
Expand All @@ -503,12 +500,26 @@ def get_workunits(self) -> Iterable[WorkUnit]:
)
projects = [project]
else:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
try:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.error(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
self.report.report_failure(
"metadata-extraction",
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account.",
)
return
except Exception as e:
logger.error(
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}"
)
self.report.report_failure(
"metadata-extraction",
f"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account. The error was: {e}",
)
return
return None

for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
Expand Down Expand Up @@ -543,8 +554,13 @@ def _process_project(
BigQueryDataDictionary.get_datasets_for_project_id(conn, project_id)
)
except Exception as e:
logger.error(
f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
error_message = f"Unable to get datasets for project {project_id}, skipping. The error was: {e}"
if self.config.profiling.enabled:
error_message = f"Unable to get datasets for project {project_id}, skipping. Does your service account has bigquery.datasets.get permission? The error was: {e}"
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id} - {error_message}",
)
return None

Expand All @@ -565,60 +581,66 @@ def _process_project(
try:
yield from self._process_schema(conn, project_id, bigquery_dataset)
except Exception as e:
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission? The error was: {e}"
if self.config.profiling.enabled:
error_message = f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. Does your service account has bigquery.tables.list, bigquery.routines.get, bigquery.routines.list permission, bigquery.tables.getData permission? The error was: {e}"

trace = traceback.format_exc()
logger.error(trace)
logger.error(
f"Unable to get tables for dataset {bigquery_dataset.name} in project {project_id}, skipping. The error was: {e}"
logger.error(error_message)
self.report.report_failure(
"metadata-extraction",
f"{project_id}.{bigquery_dataset.name} - {error_message}",
)
continue

if self.config.include_table_lineage:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)

for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)
yield from self.generate_lineage(project_id)

if self.config.include_usage_statistics:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}

for dataset in self.db_tables[project_id]:
yield from self.generate_usage_statistics(project_id)

def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate lineage for {project_id}")
for dataset in self.db_tables[project_id]:
for table in self.db_tables[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=table,
platform=self.platform,
)
if lineage_info:
yield from self.gen_lineage(dataset_urn, lineage_info)
for dataset in self.db_views[project_id]:
for view in self.db_views[project_id][dataset]:
dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
lineage_info = self.lineage_extractor.get_upstream_lineage_info(
project_id=project_id,
dataset_name=dataset,
table=view,
platform=self.platform,
)
yield from self.gen_lineage(dataset_urn, lineage_info)

def generate_usage_statistics(self, project_id: str) -> Iterable[MetadataWorkUnit]:
logger.info(f"Generate usage for {project_id}")
tables: Dict[str, List[str]] = {}
for dataset in self.db_tables[project_id]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
]
for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_tables[project_id][dataset]
table.name for table in self.db_views[project_id][dataset]
]

for dataset in self.db_views[project_id]:
if not tables[dataset]:
tables[dataset] = [
table.name for table in self.db_views[project_id][dataset]
]
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)

yield from self.usage_extractor.generate_usage_for_project(
project_id, tables
)
else:
tables[dataset].extend(
[table.name for table in self.db_views[project_id][dataset]]
)
yield from self.usage_extractor.generate_usage_for_project(project_id, tables)

def _process_schema(
self, conn: bigquery.Client, project_id: str, bigquery_dataset: BigqueryDataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
default=None,
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="[Advanced] The BigQuery project in which queries are executed. Will be passed when creating a job. If not passed, falls back to the project associated with the service account..",
)

storage_project_id: None = Field(default=None, hidden_from_schema=True)

lineage_use_sql_parser: bool = Field(
Expand Down Expand Up @@ -126,3 +132,12 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:

def get_table_pattern(self, pattern: List[str]) -> str:
return "|".join(pattern) if self.table_pattern else ""

# TODO: remove run_on_compute when the legacy bigquery source will be deprecated
def get_sql_alchemy_url(self, run_on_compute: bool = False) -> str:
if self.project_on_behalf:
return f"bigquery://{self.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any, Dict, Optional

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config

BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BQ_DATE_SHARD_FORMAT = "%Y%m%d"

Expand All @@ -17,3 +20,17 @@ def _make_gcp_logging_client(
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)


def get_bigquery_client(config: BigQueryV2Config) -> bigquery.Client:
client_options = config.extra_client_options
return bigquery.Client(config.project_on_behalf, **client_options)


def get_sql_alchemy_url(config: BigQueryV2Config) -> str:
if config.project_on_behalf:
return f"bigquery://{config.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ def _get_bigquery_log_entries_via_exported_bigquery_audit_metadata(
e,
)
self.report.report_failure(
f"{client.project}", f"unable to retrieve log entries {e}"
"lineage-extraction",
f"{client.project} - unable to retrieve log entries {e}",
)

def _get_exported_bigquery_audit_metadata(
Expand Down Expand Up @@ -367,7 +368,8 @@ def _get_bigquery_log_entries_via_gcp_logging(
e,
)
self.report.report_failure(
f"{client.project}", f"unable to retrive log entrires {e}"
"usage-extraction",
f"{client.project} - unable to retrive log entrires {e}",
)

def _generate_filter(self, audit_templates: Dict[str, str]) -> str:
Expand Down Expand Up @@ -622,10 +624,8 @@ def _parse_bigquery_log_entries(
self.report.num_query_events += 1

if event is None:
self.error(
logger,
f"{entry.log_name}-{entry.insert_id}",
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}",
logger.warning(
f"Unable to parse {type(entry)} missing read {missing_query_entry}, missing query {missing_query_entry} missing v2 {missing_query_entry_v2} for {entry}"
)
else:
yield event
Expand Down Expand Up @@ -664,10 +664,8 @@ def _parse_exported_bigquery_audit_metadata(
else:
self.error(
logger,
f"{audit_metadata['logName']}-{audit_metadata['insertId']}",
f"Unable to parse audit metadata missing "
f"QueryEvent keys:{str(missing_query_event_exported_audit)},"
f" ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}",
"usage-extraction",
f"{audit_metadata['logName']}-{audit_metadata['insertId']} Unable to parse audit metadata missing QueryEvent keys:{str(missing_query_event_exported_audit)} ReadEvent keys: {str(missing_read_event_exported_audit)} for {audit_metadata}",
)

def error(self, log: logging.Logger, key: str, reason: str) -> Any:
Expand Down