Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scopes into a GCP token #36974

Merged
merged 2 commits into from
Jan 24, 2024
Merged

Conversation

moiseenkov
Copy link
Contributor

This PR provides a small fix that adds missing scopes into a token.

Relates to #34727 and continues #36849, #36903.

In our use case we have a DAG (see below) with BigQueryInsertJobOperator with the following characteristics:

  • deferrable mode;
  • impersonation chain contains service account;
  • the BigQuery job operates over a dataset that belongs to a different GCP project.

In this case instantiation of the newly implemented class _CredentialsToken fails with exception raise in the base class:

{bigquery.py:111} ERROR - Exception occurred while checking for query completion
Traceback (most recent call last):
  File "/opt/airflow/airflow/providers/google/cloud/triggers/bigquery.py", line 90, in run
    job_status = await hook.get_job_status(job_id=self.job_id, project_id=self.project_id)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3300, in get_job_status
    job_client = await self.get_job_instance(project_id, job_id, s)
  File "/opt/airflow/airflow/providers/google/cloud/hooks/bigquery.py", line 3290, in get_job_instance
    token = await self.get_token(session=session)
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 695, in get_token
    return await _CredentialsToken.from_hook(sync_hook, session=session)
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 658, in from_hook
    return cls(
  File "/opt/airflow/airflow/providers/google/common/hooks/base_google.py", line 646, in __init__
    super().__init__(session=cast(Session, session))
  File "/usr/local/lib/python3.8/site-packages/gcloud/aio/auth/token.py", line 157, in __init__
    raise Exception(
Exception: scopes must be provided when token type is service account

Current PR simply passes hook scopes into the _CredentialsToken, so the base class Token would work properly.


DAG

from datetime import datetime

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyDatasetOperator,
    BigQueryCreateEmptyTableOperator,
    BigQueryInsertJobOperator,
)

ENV_ID = "TEST_ENV"
PROJECT_ID = "TEST_PROJECT"

DAG_ID = "example_bigquery_queries_async"

DATASET_NAME = f"bq_d_{DAG_ID}_{ENV_ID}".replace("-", "_")
LOCATION = "us"

TABLE_NAME_1 = f"bq_{DAG_ID}_{ENV_ID}_1".replace("-", "_")
TABLE_NAME_2 = f"table_{DAG_ID}_{ENV_ID}_2".replace("-", "_")

SCHEMA = [
    {"name": "value", "type": "INTEGER", "mode": "REQUIRED"},
    {"name": "name", "type": "STRING", "mode": "NULLABLE"},
    {"name": "ds", "type": "STRING", "mode": "NULLABLE"},
]

INSERT_DATE = datetime.now().strftime("%Y-%m-%d")
INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_NAME_1} VALUES "
    f"(42, 'monthy python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)
SA = "[email protected]"


with DAG(
    dag_id=DAG_ID,
    schedule="@once",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example", "bigquery", "deferrable"],
    user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME_1},
) as dag:

    create_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id="create_dataset",
        dataset_id=DATASET_NAME,
        location=LOCATION,
        project_id=PROJECT_ID,
        impersonation_chain=SA,
    )

    create_table_1 = BigQueryCreateEmptyTableOperator(
        task_id="create_table_1",
        dataset_id=DATASET_NAME,
        table_id=TABLE_NAME_1,
        schema_fields=SCHEMA,
        location=LOCATION,
        project_id=PROJECT_ID,
        impersonation_chain=SA,
    )

    insert_query_job_imp = BigQueryInsertJobOperator(
        task_id="insert_query_job_imp",
        project_id=PROJECT_ID,
        configuration={
            "query": {
                "query": INSERT_ROWS_QUERY,
                "useLegacySql": False,
                "priority": "BATCH",
            }
        },
        location=LOCATION,
        impersonation_chain=SA,
        deferrable=True,
    )

    create_dataset >> create_table_1 >> insert_query_job_imp

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Jan 23, 2024
@VladaZakharova
Copy link
Contributor

Hi @m1racoli @Lee-W @pankajastro !
Can you please check changes from this PR? Thanks!

@moiseenkov moiseenkov force-pushed the add_scope_to_gcp_token branch from 98c1983 to 2d6bda5 Compare January 23, 2024 16:21
@moiseenkov moiseenkov force-pushed the add_scope_to_gcp_token branch from 2d6bda5 to a37134a Compare January 23, 2024 16:23
Copy link
Contributor

@dirrao dirrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing test cases failing. Proposed a change to fix it.

@potiuk potiuk merged commit 241b50a into apache:main Jan 24, 2024
55 checks passed
Copy link
Contributor

@m1racoli m1racoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants