Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BigQueryValueCheckOperator does not accept non-default project_id, which lead to jobs with impersonation_chain to fail. #32870

Closed
1 of 2 tasks
GaryLiuTelus opened this issue Jul 26, 2023 · 19 comments
Assignees
Labels
area:providers Can't Reproduce The problem cannot be reproduced good first issue kind:bug This is a clearly a bug pending-response provider:google Google (including GCP) related issues

Comments

@GaryLiuTelus
Copy link

Apache Airflow version

2.6.3

What happened

airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator cannot specify project_id, and always use default project_id. So when I use a impersonated service account that does not have access to the tables in the default project, it gave 403 Access Denied error.

What you think should happen instead

BigQueryValueCheckOperator should be like other bigquery operators, such as BigQueryUpsertTableOperator, BigQueryInsertJobOperator, a non-default project_id can be assigned, and so access request can always to redirect to the right project, instead of always using the current/default project.

How to reproduce

Create an Airflow instance in project A, and then use a service account that does not have access to the tables in project A (but can access to the tables in project B) to run the below code:

with models.DAG(
   dag_id="test",
   start_date=xxxx,
  schedule="x x x * *"
):
     BigQueryValueCheckOperator(
        task_id="value_check",
        sql=f"select count(1) from `{project_B}.{DATASET}.{TABLE_NAME}`",
        pass_value=102,
        tolerance=0.15,
        use_legacy_sql=False,
        location=location,
        impersonation_chain=[service_email]
    )

Operating System

GCP Cloud

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.4.0

Deployment

Google Cloud Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@GaryLiuTelus GaryLiuTelus added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jul 26, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 26, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@hussein-awala hussein-awala added provider:google Google (including GCP) related issues area:providers good first issue and removed area:core needs-triage label for new issues that we didn't triage yet labels Jul 26, 2023
@hussein-awala
Copy link
Member

related to #32093 but here we don't use the deferrable mode

@eladkal
Copy link
Contributor

eladkal commented Jul 27, 2023

@avinashpandeshwar can you take a look?

@boraberke
Copy link
Contributor

If @avinashpandeshwar you are not able to look, I am willing to work on this issue.

@avinashpandeshwar
Copy link
Contributor

@boraberke Please go ahead. I am on vacation for some time, so will be delayed anyway. Once I am back, I am willing to help out if still required.

cc @eladkal

@rohan472000
Copy link
Contributor

rohan472000 commented Jul 31, 2023

@GaryLiuTelus , BigQueryValueCheckOperator is primarily used for data validation and quality checks, while the BigQueryInsertJobOperator or other mentioned operator (in issue description) is used to execute a BigQuery job, I'm not sure whether it would be good to have those redirecting functionality here in BigQueryValueCheckOperator.

Btw if we have to make those changes then I think we need to pass force_rerun: bool = True in def __init__ , job_id and project_id in template_fields, .json in template_ext, other things also....

@lwyszomi @turbaszek , need your opinion.

@GaryLiuTelus
Copy link
Author

@rohan472000
As you can see from below, with the BigQueryValueCheckOperator, I need to run a query against the target table to get some information (the total records in below case), and then compare with the target number. If the target table is in a different project, I need the impersonation chain to get the result back.

tbl_check = BigQueryValueCheckOperator(
        task_id="value_check",
        sql=f"select count(1) from `{project_id}.{DATASET}.{TABLE_NAME}`",
        pass_value=1_000_000,
        tolerance=0.05,
        use_legacy_sql=False,
        # project_id=project_id,
        location=location,
        impersonation_chain=[service_email]
    )

@dzhigimont
Copy link
Contributor

I want to work on this, assign me please

@nathadfield
Copy link
Collaborator

@dzhigimont Are you going to be working on this anytime soon or should we un-assign you?

@vchiapaikeo
Copy link
Contributor

Happy to take a look at this. Feel free to assign me!

@vchiapaikeo
Copy link
Contributor

@GaryLiuTelus , I know this is a bit old for you now but did you try changing the gcp_conn_id to one that defaults to project_B by any chance? That should allow the BQ query to run against project_B. I tried that locally on my end and it seems to work as expected. Wondering if I'm missing something else here.

@eladkal eladkal added Can't Reproduce The problem cannot be reproduced pending-response labels Oct 21, 2023
@GaryLiuTelus
Copy link
Author

@vchiapaikeo Can you please advise how to get gcp_conn_id? I only used default value so far. Thanks!

@GaryLiuTelus
Copy link
Author

And as I read the source code of other operators, they use a hook to pass impersonation chain, and send the request via the hook, instead of send the request directly. I guess this might be the reason? Is that possible to use a hook as well in this operator as well?

@vchiapaikeo
Copy link
Contributor

Can you please advise how to get gcp_conn_id? I only used default value so far. Thanks!

So here's a simple dag example:

from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryValueCheckOperator


DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "start_date": "2023-03-13",
    "retries": 1,
    "retry_delay": 300,
}


with DAG(
    schedule_interval="@daily",
    max_active_runs=1,
    max_active_tasks=5,
    catchup=False,
    dag_id="test_bigquery_value_check",
    default_args=DEFAULT_TASK_ARGS,
) as dag:
    value_check_on_same_project_without_impersonation = BigQueryValueCheckOperator(
       task_id="value_check_on_same_project_without_impersonation",
       sql=f"select count(1) from `airflow-vchiapaikeo.test.table1`",
       pass_value=1,
       tolerance=0.15,
       use_legacy_sql=False,
       location="US",
       gcp_conn_id="google_cloud_default",
       # deferrable=True,
       # impersonation_chain=["[email protected]"],
    )

    value_check_on_diff_project_with_impersonation = BigQueryValueCheckOperator(
       task_id="value_check_on_diff_project_without_impersonation_expect_fail",
       sql=f"select count(1) from `airflow2-vchiapaikeo.test.table1`",
       pass_value=1,
       tolerance=0.15,
       use_legacy_sql=False,
       location="US",
       gcp_conn_id="google_cloud_default2",
       # deferrable=True,
       impersonation_chain=["[email protected]"],
    )

I define two different gcp_conn_ids. One w/ project A and the other w/ project B.

image

You can see the second operator gets executed in the correct project and with the correct service account here:

image

And as I read the source code of other operators, they use a hook to pass impersonation chain, and send the request via the hook, instead of send the request directly. I guess this might be the reason? Is that possible to use a hook as well in this operator as well?

This is a little complicated actually and I don't totally understand all of it. Part of the hook uses the soon to be deprecated discovery API and the other part uses the BigQuery client. The part that uses the discovery api infers the project id from the gcp_conn_id connection. The common code shared among the DbApiHook probably needs to be refactored to move away from the discovery API and to use BigQuery client... but it will be quite difficult 😓 . Please correct me if I am wrong, anybody that knows this code better than I do.

@GaryLiuTelus
Copy link
Author

Thanks for the sharing. And it works! Appreciate!

But still, I think this is more like a workaround. But I think if project_id can be provided as a parameter would be better instead of create a new gcp_conn_id, but that need some external dependencies, that in some cases, may need user to have admin permission, so they can add new connections,.

@vchiapaikeo
Copy link
Contributor

vchiapaikeo commented Oct 23, 2023

There's actually a bit of inconsistency here and project_id doesn't always necessarily mean where BQ compute occurs. For example, with BigQueryUpsertTableOperator, the project_id field refers to a storage location. But in BigQueryInsertJobOperator, it refers to where BQ compute occurs.

Another part of my hesitancy to do this is that the BigQueryHook's constructor itself does not expose project_id as one of its parameters. And, if you look through the hook code, the project_id parameter among the hook's methods does not necessarily refer to the compute project. For most methods, it is both the compute and storage project. So by changing this up for the BigQueryValueCheckOperator, we'd potentially be introducing even more inconsistency.

I think the path forward here needs to be to deprecate many of these calls that are based on the discovery API and use the BigQuery client object which allows us to distinguish these two types of projects (compute and storage) more clearly. It's more of a refactor than a one time fix for this specific operator IMO.

@GaryLiuTelus
Copy link
Author

OK, this make sense. Thanks for the clarification.

@eladkal
Copy link
Contributor

eladkal commented Oct 24, 2023

Is there a task left on this issue?

@GaryLiuTelus
Copy link
Author

I am OK with this workaround for now. Thanks!

@eladkal eladkal closed this as completed Oct 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers Can't Reproduce The problem cannot be reproduced good first issue kind:bug This is a clearly a bug pending-response provider:google Google (including GCP) related issues
Projects
None yet
Development

No branches or pull requests

9 participants