Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink #31935

Merged
merged 1 commit into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 8 additions & 30 deletions airflow/providers/google/cloud/operators/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"


class BigQueryUIColors(enum.Enum):
Expand Down Expand Up @@ -90,17 +90,8 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)

return (
BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id_params["job_id"],
project_id=job_id_params["project_id"],
location=job_id_params["location"],
)
if job_id_params
else ""
)
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""


@attr.s(auto_attribs=True)
Expand All @@ -119,16 +110,13 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
job_ids = job_ids_params["job_id"]
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"]
)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)


class _BigQueryDbHookMixin:
Expand Down Expand Up @@ -1196,13 +1184,7 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
job_id_params = {
"job_id": job_id,
"project_id": self.hook.project_id,
"location": self.location if self.location else "US",
}
context["task_instance"].xcom_push(key="job_id_params", value=job_id_params)
return job_id
context["task_instance"].xcom_push(key="job_id", value=job_id)

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2745,13 +2727,9 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
job_id_params = {
"job_id": job_id,
"project_id": self.project_id or self.hook.project_id,
"location": self.location if self.location else "US",
}
context["ti"].xcom_push(key="job_id_params", value=job_id_params)
context["ti"].xcom_push(key="job_id", value=self.job_id)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
69 changes: 22 additions & 47 deletions tests/providers/google/cloud/operators/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,6 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -676,15 +672,11 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(

# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push("job_id_params", test_job_id_params)

ti.xcom_push("job_id", 12345)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
assert url == "https://console.cloud.google.com/bigquery?j=12345"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -719,23 +711,17 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
)

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -754,9 +740,7 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self,
mock_hook,
create_task_instance_of_operator,
self, mock_hook, create_task_instance_of_operator
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -767,15 +751,11 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -791,22 +771,17 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
)

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
)


Expand Down