Skip to content

Commit

Permalink
fix(ingest): bigquery - setting partition id for profiling data (data…
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored and cccs-Dustin committed Feb 1, 2023
1 parent 666b14e commit 8a2d37c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,19 @@ def get_workunits(self) -> Iterable[WorkUnit]:
conn: bigquery.Client = self.get_bigquery_client()
self.add_config_to_report()

projects: List[BigqueryProject] = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
projects: List[BigqueryProject]
if self.config.project_id:
project = BigqueryProject(
id=self.config.project_id, name=self.config.project_id
)
return
projects = [project]
else:
projects = BigQueryDataDictionary.get_projects(conn)
if len(projects) == 0:
logger.warning(
"Get projects didn't return any project. Maybe resourcemanager.projects.get permission is missing for the service account. You can assign predefined roles/bigquery.metadataViewer role to your service account."
)
return

for project_id in projects:
if not self.config.project_id_pattern.allowed(project_id.id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
# The inheritance hierarchy is wonky here, but these options need modifications.
project_id: Optional[str] = Field(
default=None,
description="[deprecated] Use project_id_pattern instead.",
description="[deprecated] Use project_id_pattern instead. You can use this property if you only want to ingest one project and don't want to give project resourcemanager.projects.list to your service account",
)
storage_project_id: None = Field(default=None, hidden_from_schema=True)

Expand Down Expand Up @@ -97,14 +97,13 @@ def backward_compatibility_configs_set(cls, values: Dict) -> Dict:

if project_id_pattern == AllowDenyPattern.allow_all() and project_id:
logging.warning(
"project_id_pattern is not set but project_id is set, setting project_id as project_id_pattern. project_id will be deprecated, please use project_id_pattern instead."
"project_id_pattern is not set but project_id is set, source will only ingest the project_id project. project_id will be deprecated, please use project_id_pattern instead."
)
values["project_id_pattern"] = AllowDenyPattern(allow=[f"^{project_id}$"])
elif project_id_pattern != AllowDenyPattern.allow_all() and project_id:
logging.warning(
"project_id will be ignored in favour of project_id_pattern. project_id will be deprecated, please use project_id only."
"use project_id_pattern whenever possible. project_id will be deprecated, please use project_id_pattern only if possible."
)
values.pop("project_id")

dataset_pattern = values.get("dataset_pattern")
schema_pattern = values.get("schema_pattern")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ def get_workunits(

request = cast(BigqueryProfilerRequest, request)
profile.sizeInBytes = request.table.size_in_bytes
# If table is partitioned we profile only one partition (if nothing set then the last one)
# but for table level we can use the rows_count from the table metadata
# This way even though column statistics only reflects one partition data but the rows count
# shows the proper count.
if profile.partitionSpec and profile.partitionSpec.partition:
profile.rowCount = request.table.rows_count

dataset_name = request.pretty_name
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
Expand Down Expand Up @@ -250,7 +257,10 @@ def get_bigquery_profile_request(
profile_request = BigqueryProfilerRequest(
pretty_name=dataset_name,
batch_kwargs=dict(
schema=project, table=f"{dataset}.{table.name}", custom_sql=custom_sql
schema=project,
table=f"{dataset}.{table.name}",
custom_sql=custom_sql,
partition=partition,
),
table=table,
profile_table_level_only=profile_table_level_only,
Expand Down

0 comments on commit 8a2d37c

Please sign in to comment.