Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): bigquery - option to set on behalf project #6660

Merged
merged 5 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
BigqueryTable,
BigqueryView,
)
from datahub.ingestion.source.bigquery_v2.common import get_bigquery_client
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
Expand Down Expand Up @@ -224,10 +225,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigqueryV2Source":
config = BigQueryV2Config.parse_obj(config_dict)
return cls(ctx, config)

def get_bigquery_client(self) -> bigquery.Client:
client_options = self.config.extra_client_options
return bigquery.Client(**client_options)

@staticmethod
def connectivity_test(client: bigquery.Client) -> CapabilityReport:
ret = client.query("select 1")
Expand All @@ -240,12 +237,12 @@ def connectivity_test(client: bigquery.Client) -> CapabilityReport:

@staticmethod
def metada_read_capability_test(
project_ids: List[str], profiling_enabled: bool
project_ids: List[str], config: BigQueryV2Config
) -> CapabilityReport:
for project_id in project_ids:
try:
logger.info((f"Metadata read capability test for project {project_id}"))
client: bigquery.Client = bigquery.Client(project_id)
client: bigquery.Client = get_bigquery_client(config)
assert client
result = BigQueryDataDictionary.get_datasets_for_project_id(
client, project_id, 10
Expand All @@ -260,7 +257,7 @@ def metada_read_capability_test(
project_id=project_id,
dataset_name=result[0].name,
tables={},
with_data_read_permission=profiling_enabled,
with_data_read_permission=config.profiling.enabled,
)
if len(tables) == 0:
return CapabilityReport(
Expand Down Expand Up @@ -329,7 +326,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
pydantic.Extra.allow
) # we are okay with extra fields during this stage
connection_conf = BigQueryV2Config.parse_obj(config_dict)
client: bigquery.Client = bigquery.Client()
client: bigquery.Client = get_bigquery_client(connection_conf)
assert client

test_report.basic_connectivity = BigqueryV2Source.connectivity_test(client)
Expand All @@ -346,7 +343,7 @@ def test_connection(config_dict: dict) -> TestConnectionReport:
project_ids.append(project.project_id)

metada_read_capability = BigqueryV2Source.metada_read_capability_test(
project_ids, connection_conf.profiling.enabled
project_ids, connection_conf
)
if SourceCapability.SCHEMA_METADATA not in _report:
_report[SourceCapability.SCHEMA_METADATA] = metada_read_capability
Expand Down Expand Up @@ -489,7 +486,7 @@ def add_table_to_dataset_container(

def get_workunits(self) -> Iterable[WorkUnit]:
logger.info("Getting projects")
conn: bigquery.Client = self.get_bigquery_client()
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()

projects: List[BigqueryProject]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
default=None,
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
)

project_on_behalf: Optional[str] = Field(
default=None,
description="Project ID for the project which the bigquery client acts on behalf of. Will be passed when creating a job. If not passed, falls back to the default inferred from the environment.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
description="Project ID for the project which the bigquery client acts on behalf of. Will be passed when creating a job. If not passed, falls back to the default inferred from the environment.",
description="[Advanced] The BigQuery project in which queries are executed. If not passed, falls back to the project associated with the service account.",

)

storage_project_id: None = Field(default=None, hidden_from_schema=True)

lineage_use_sql_parser: bool = Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Any, Dict, Optional

from google.cloud import bigquery
from google.cloud.logging_v2.client import Client as GCPLoggingClient

from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config

BQ_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BQ_DATE_SHARD_FORMAT = "%Y%m%d"

Expand All @@ -17,3 +20,17 @@ def _make_gcp_logging_client(
return GCPLoggingClient(**client_options, project=project_id)
else:
return GCPLoggingClient(**client_options)


def get_bigquery_client(config: BigQueryV2Config) -> bigquery.Client:
client_options = config.extra_client_options
return bigquery.Client(config.project_on_behalf, **client_options)


def get_sql_alchemy_url(config: BigQueryV2Config) -> str:
if config.project_on_behalf:
return f"bigquery://{config.project_on_behalf}"
# When project_id is not set, we will attempt to detect the project ID
# based on the credentials or environment variables.
# See https://github.com/mxmzdlv/pybigquery#authentication.
return "bigquery://"
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
BigqueryColumn,
BigqueryTable,
)
from datahub.ingestion.source.bigquery_v2.common import get_sql_alchemy_url
from datahub.ingestion.source.ge_data_profiler import (
DatahubGEProfiler,
GEProfilerRequest,
Expand Down Expand Up @@ -329,7 +330,7 @@ def get_inspectors(self) -> Iterable[Inspector]:

def get_profiler_instance(self) -> "DatahubGEProfiler":
logger.debug("Getting profiler instance from bigquery")
url = self.config.get_sql_alchemy_url()
url = get_sql_alchemy_url(self.config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we override the config.get_sql_alchemy_url method?


logger.debug(f"sql_alchemy_url={url}")

Expand Down