From 95e5675714f12c177e30d83a14d28222b06d217b Mon Sep 17 00:00:00 2001 From: Beata Kossakowska <109511937+bkossakowska@users.noreply.github.com> Date: Mon, 31 Oct 2022 04:21:51 +0100 Subject: [PATCH] Migration of System Tests: Dataplex (AIP-47) (#26989) --- .../providers/google/cloud/hooks/dataplex.py | 115 ++++++++- .../providers/google/cloud/links/dataplex.py | 27 +++ .../google/cloud/operators/dataplex.py | 219 +++++++++++++++++- airflow/providers/google/provider.yaml | 1 + .../operators/cloud/dataplex.rst | 53 ++++- .../google/cloud/hooks/test_dataplex.py | 50 ++++ .../google/cloud/operators/test_dataplex.py | 76 ++++++ .../cloud/operators/test_dataplex_system.py | 47 ---- .../google/cloud/dataplex/__init__.py | 16 ++ .../cloud/dataplex}/example_dataplex.py | 119 ++++++++-- .../cloud/dataplex/resources/__init__.py | 16 ++ .../dataplex/resources/spark_example_pi.py | 43 ++++ 12 files changed, 710 insertions(+), 72 deletions(-) delete mode 100644 tests/providers/google/cloud/operators/test_dataplex_system.py create mode 100644 tests/system/providers/google/cloud/dataplex/__init__.py rename {airflow/providers/google/cloud/example_dags => tests/system/providers/google/cloud/dataplex}/example_dataplex.py (53%) create mode 100644 tests/system/providers/google/cloud/dataplex/resources/__init__.py create mode 100644 tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py diff --git a/airflow/providers/google/cloud/hooks/dataplex.py b/airflow/providers/google/cloud/hooks/dataplex.py index 1dbee03b143ed..e8121b582e08b 100644 --- a/airflow/providers/google/cloud/hooks/dataplex.py +++ b/airflow/providers/google/cloud/hooks/dataplex.py @@ -24,10 +24,11 @@ from google.api_core.operation import Operation from google.api_core.retry import Retry from google.cloud.dataplex_v1 import DataplexServiceClient -from google.cloud.dataplex_v1.types import Task +from google.cloud.dataplex_v1.types import Lake, Task from googleapiclient.discovery import Resource from airflow.exceptions import AirflowException +from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import GoogleBaseHook @@ -70,7 +71,7 @@ def get_dataplex_client(self) -> DataplexServiceClient: client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443") return DataplexServiceClient( - credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options + credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options ) def wait_for_operation(self, timeout: float | None, operation: Operation): @@ -248,3 +249,113 @@ def get_task( metadata=metadata, ) return result + + @GoogleBaseHook.fallback_to_default_project_id + def delete_lake( + self, + project_id: str, + region: str, + lake_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Any: + """ + Delete the lake resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param lake_id: Required. The ID of the Google Cloud lake to be deleted. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + """ + name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}" + + client = self.get_dataplex_client() + result = client.delete_lake( + request={ + "name": name, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result + + @GoogleBaseHook.fallback_to_default_project_id + def create_lake( + self, + project_id: str, + region: str, + lake_id: str, + body: dict[str, Any] | Lake, + validate_only: bool | None = None, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Any: + """ + Creates a lake resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param lake_id: Required. Lake identifier. + :param body: Required. The Request body contains an instance of Lake. + :param validate_only: Optional. Only validate the request, but do not perform mutations. + The default is false. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + """ + parent = f"projects/{project_id}/locations/{region}" + client = self.get_dataplex_client() + result = client.create_lake( + request={ + "parent": parent, + "lake_id": lake_id, + "lake": body, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result + + @GoogleBaseHook.fallback_to_default_project_id + def get_lake( + self, + project_id: str, + region: str, + lake_id: str, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Any: + """ + Get lake resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param lake_id: Required. The ID of the Google Cloud lake to be retrieved. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + """ + name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}/" + client = self.get_dataplex_client() + result = client.get_lake( + request={ + "name": name, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + return result diff --git a/airflow/providers/google/cloud/links/dataplex.py b/airflow/providers/google/cloud/links/dataplex.py index e7381c1417b62..dcf3c8755848a 100644 --- a/airflow/providers/google/cloud/links/dataplex.py +++ b/airflow/providers/google/cloud/links/dataplex.py @@ -29,6 +29,10 @@ DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + "/{lake_id}.{task_id};location={region}/jobs?project={project_id}" DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + "?project={project_id}&qLake={lake_id}.{region}" +DATAPLEX_LAKE_LINK = ( + "https://console.cloud.google.com/dataplex/lakes/{lake_id};location={region}?project={project_id}" +) + class DataplexTaskLink(BaseGoogleLink): """Helper class for constructing Dataplex Task link""" @@ -75,3 +79,26 @@ def persist( "region": task_instance.region, }, ) + + +class DataplexLakeLink(BaseGoogleLink): + """Helper class for constructing Dataplex Lake link""" + + name = "Dataplex Lake" + key = "dataplex_lake_key" + format_str = DATAPLEX_LAKE_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + ): + task_instance.xcom_push( + context=context, + key=DataplexLakeLink.key, + value={ + "lake_id": task_instance.lake_id, + "region": task_instance.region, + "project_id": task_instance.project_id, + }, + ) diff --git a/airflow/providers/google/cloud/operators/dataplex.py b/airflow/providers/google/cloud/operators/dataplex.py index e212a134bf132..12c3917690d07 100644 --- a/airflow/providers/google/cloud/operators/dataplex.py +++ b/airflow/providers/google/cloud/operators/dataplex.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. """This module contains Google Dataplex operators.""" + from __future__ import annotations from time import sleep @@ -25,12 +26,16 @@ from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.api_core.retry import Retry, exponential_sleep_generator -from google.cloud.dataplex_v1.types import Task +from google.cloud.dataplex_v1.types import Lake, Task from googleapiclient.errors import HttpError from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.dataplex import DataplexHook -from airflow.providers.google.cloud.links.dataplex import DataplexTaskLink, DataplexTasksLink +from airflow.providers.google.cloud.links.dataplex import ( + DataplexLakeLink, + DataplexTaskLink, + DataplexTasksLink, +) class DataplexCreateTaskOperator(BaseOperator): @@ -427,4 +432,214 @@ def execute(self, context: Context) -> dict: timeout=self.timeout, metadata=self.metadata, ) + DataplexTasksLink.persist(context=context, task_instance=self) return Task.to_dict(task) + + +class DataplexCreateLakeOperator(BaseOperator): + """ + Creates a lake resource within a lake. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param lake_id: Required. Lake identifier. + :param body: Required. The Request body contains an instance of Lake. + :param validate_only: Optional. Only validate the request, but do not perform mutations. The default is + false. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + :param asynchronous: Flag informing should the Dataplex lake be created asynchronously. + This is useful for long running creating lakes and + waiting on them asynchronously using the DataplexLakeSensor + """ + + template_fields = ( + "project_id", + "lake_id", + "body", + "validate_only", + "delegate_to", + "impersonation_chain", + ) + template_fields_renderers = {"body": "json"} + operator_extra_links = (DataplexLakeLink(),) + + def __init__( + self, + project_id: str, + region: str, + lake_id: str, + body: dict[str, Any], + validate_only: bool | None = None, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: str | None = None, + impersonation_chain: str | Sequence[str] | None = None, + asynchronous: bool = False, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.lake_id = lake_id + self.body = body + self.validate_only = validate_only + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + self.asynchronous = asynchronous + + def execute(self, context: Context) -> dict: + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + self.log.info("Creating Dataplex lake %s", self.lake_id) + + try: + operation = hook.create_lake( + project_id=self.project_id, + region=self.region, + lake_id=self.lake_id, + body=self.body, + validate_only=self.validate_only, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if not self.asynchronous: + self.log.info("Waiting for Dataplex lake %s to be created", self.lake_id) + lake = hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Lake %s created successfully", self.lake_id) + else: + is_done = operation.done() + self.log.info("Is operation done already? %s", is_done) + return is_done + except HttpError as err: + if err.resp.status not in (409, "409"): + raise + self.log.info("Lake %s already exists", self.lake_id) + # Wait for lake to be ready + for time_to_wait in exponential_sleep_generator(initial=10, maximum=120): + lake = hook.get_lake( + project_id=self.project_id, + region=self.region, + lake_id=self.lake_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + if lake["state"] != "CREATING": + break + sleep(time_to_wait) + DataplexLakeLink.persist( + context=context, + task_instance=self, + ) + return Lake.to_dict(lake) + + +class DataplexDeleteLakeOperator(BaseOperator): + """ + Delete the lake resource. + + :param project_id: Required. The ID of the Google Cloud project that the lake belongs to. + :param region: Required. The ID of the Google Cloud region that the lake belongs to. + :param lake_id: Required. Lake identifier. + :param api_version: The version of the api that will be requested for example 'v1'. + :param retry: A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Additional metadata that is provided to the method. + :param gcp_conn_id: The connection ID to use when fetching connection info. + :param delegate_to: The account to impersonate, if any. For this to work, the service account making the + request must have domain-wide delegation enabled. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields = ("project_id", "lake_id", "delegate_to", "impersonation_chain") + operator_extra_links = (DataplexLakeLink(),) + + def __init__( + self, + project_id: str, + region: str, + lake_id: str, + api_version: str = "v1", + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + gcp_conn_id: str = "google_cloud_default", + delegate_to: str | None = None, + impersonation_chain: str | Sequence[str] | None = None, + *args, + **kwargs, + ) -> None: + + super().__init__(*args, **kwargs) + self.project_id = project_id + self.region = region + self.lake_id = lake_id + self.api_version = api_version + self.retry = retry + self.timeout = timeout + self.metadata = metadata + self.gcp_conn_id = gcp_conn_id + self.delegate_to = delegate_to + self.impersonation_chain = impersonation_chain + + def execute(self, context: Context) -> None: + + hook = DataplexHook( + gcp_conn_id=self.gcp_conn_id, + delegate_to=self.delegate_to, + api_version=self.api_version, + impersonation_chain=self.impersonation_chain, + ) + + self.log.info("Deleting Dataplex lake %s", self.lake_id) + + operation = hook.delete_lake( + project_id=self.project_id, + region=self.region, + lake_id=self.lake_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + DataplexLakeLink.persist(context=context, task_instance=self) + hook.wait_for_operation(timeout=self.timeout, operation=operation) + self.log.info("Dataplex lake %s deleted successfully!", self.lake_id) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index fb0a3d434cfeb..10c47c7d5436d 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -964,6 +964,7 @@ extra-links: - airflow.providers.google.cloud.links.cloud_sql.CloudSQLInstanceDatabaseLink - airflow.providers.google.cloud.links.dataplex.DataplexTaskLink - airflow.providers.google.cloud.links.dataplex.DataplexTasksLink + - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink - airflow.providers.google.cloud.links.bigquery_dts.BigQueryDataTransferConfigLink diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst index 00b1949a0ab16..c6ed5db7da6ae 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst @@ -31,7 +31,7 @@ For more information about the available fields to pass when creating a task, vi A simple task configuration can look as followed: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 0 :start-after: [START howto_dataplex_configuration] @@ -40,13 +40,13 @@ A simple task configuration can look as followed: With this configuration we can create the task both synchronously & asynchronously: :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateTaskOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_create_task_operator] :end-before: [END howto_dataplex_create_task_operator] -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_async_create_task_operator] @@ -59,7 +59,7 @@ To delete a task you can use: :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteTaskOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_delete_task_operator] @@ -72,7 +72,7 @@ To list tasks you can use: :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexListTasksOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_list_tasks_operator] @@ -85,7 +85,7 @@ To get a task you can use: :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexGetTaskOperator` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_get_task_operator] @@ -98,8 +98,47 @@ To wait for a task created asynchronously you can use: :class:`~airflow.providers.google.cloud.sensors.dataplex.DataplexTaskStateSensor` -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataplex.py +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py :language: python :dedent: 4 :start-after: [START howto_dataplex_task_state_sensor] :end-before: [END howto_dataplex_task_state_sensor] + +Create a Lake +------------- + +Before you create a dataplex lake you need to define its body. + +For more information about the available fields to pass when creating a lake, visit `Dataplex create lake API. `__ + +A simple task configuration can look as followed: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py + :language: python + :dedent: 0 + :start-after: [START howto_dataplex_lake_configuration] + :end-before: [END howto_dataplex_lake_configuration] + +With this configuration we can create the lake: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCreateLakeOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_create_lake_operator] + :end-before: [END howto_dataplex_create_lake_operator] + + +Delete a lake +------------- + +To delete a lake you can use: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexDeleteLakeOperator` + +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataplex/example_dataplex.py + :language: python + :dedent: 4 + :start-after: [START howto_dataplex_delete_lake_operator] + :end-before: [END howto_dataplex_delete_lake_operator] diff --git a/tests/providers/google/cloud/hooks/test_dataplex.py b/tests/providers/google/cloud/hooks/test_dataplex.py index 562b60a0e4cf4..9a11c01badf6a 100644 --- a/tests/providers/google/cloud/hooks/test_dataplex.py +++ b/tests/providers/google/cloud/hooks/test_dataplex.py @@ -121,3 +121,53 @@ def test_get_task(self, mock_client): timeout=None, metadata=(), ) + + @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client")) + def test_create_lake(self, mock_client): + self.hook.create_lake( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + body=BODY, + validate_only=None, + ) + + parent = f"projects/{PROJECT_ID}/locations/{REGION}" + mock_client.return_value.create_lake.assert_called_once_with( + request=dict( + parent=parent, + lake_id=LAKE_ID, + lake=BODY, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client")) + def test_delete_lake(self, mock_client): + self.hook.delete_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID) + + name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}" + mock_client.return_value.delete_lake.assert_called_once_with( + request=dict( + name=name, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_STRING.format("DataplexHook.get_dataplex_client")) + def test_get_lake(self, mock_client): + self.hook.get_lake(project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID) + + name = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/" + mock_client.return_value.get_lake.assert_called_once_with( + request=dict( + name=name, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/tests/providers/google/cloud/operators/test_dataplex.py b/tests/providers/google/cloud/operators/test_dataplex.py index 8ecd39678004a..77115e206605d 100644 --- a/tests/providers/google/cloud/operators/test_dataplex.py +++ b/tests/providers/google/cloud/operators/test_dataplex.py @@ -21,7 +21,9 @@ from google.api_core.gapic_v1.method import DEFAULT from airflow.providers.google.cloud.operators.dataplex import ( + DataplexCreateLakeOperator, DataplexCreateTaskOperator, + DataplexDeleteLakeOperator, DataplexDeleteTaskOperator, DataplexGetTaskOperator, DataplexListTasksOperator, @@ -29,11 +31,18 @@ HOOK_STR = "airflow.providers.google.cloud.operators.dataplex.DataplexHook" TASK_STR = "airflow.providers.google.cloud.operators.dataplex.Task" +LAKE_STR = "airflow.providers.google.cloud.operators.dataplex.Lake" PROJECT_ID = "project-id" REGION = "region" LAKE_ID = "lake-id" BODY = {"body": "test"} +BODY_LAKE = { + "display_name": "test_display_name", + "labels": [], + "description": "test_description", + "metastore": {"service": ""}, +} DATAPLEX_TASK_ID = "testTask001" GCP_CONN_ID = "google_cloud_default" @@ -180,3 +189,70 @@ def test_execute(self, task_mock, hook_mock): timeout=None, metadata=(), ) + + +class TestDataplexDeleteLakeOperator(TestCase): + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock): + op = DataplexDeleteLakeOperator( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + task_id="delete_dataplex_lake", + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + impersonation_chain=IMPERSONATION_CHAIN, + ) + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.delete_lake.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataplexCreateLakeOperator(TestCase): + @mock.patch(HOOK_STR) + @mock.patch(LAKE_STR) + def test_execute(self, lake_mock, hook_mock): + op = DataplexCreateLakeOperator( + task_id="create_dataplex_lake", + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + body=BODY_LAKE, + validate_only=None, + api_version=API_VERSION, + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.wait_for_operation.return_value = None + lake_mock.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + delegate_to=DELEGATE_TO, + api_version=API_VERSION, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.create_lake.assert_called_once_with( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + body=BODY_LAKE, + validate_only=None, + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/tests/providers/google/cloud/operators/test_dataplex_system.py b/tests/providers/google/cloud/operators/test_dataplex_system.py deleted file mode 100644 index b3b9c709d4e1a..0000000000000 --- a/tests/providers/google/cloud/operators/test_dataplex_system.py +++ /dev/null @@ -1,47 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -import pytest - -from airflow.providers.google.cloud.example_dags.example_dataplex import BUCKET, SPARK_FILE_NAME -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_GCS_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - -GCS_URI = f"gs://{BUCKET}" - -spark_file = """ -#!/usr/bin/python -print("### Hello, dataplex! ###") -""" - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.credential_file(GCP_GCS_KEY) -class DataplexExampleDagsTest(GoogleSystemTest): - def setUp(self): - super().setUp() - self.create_gcs_bucket(BUCKET) - self.upload_content_to_gcs(lines=spark_file, bucket=GCS_URI, filename=SPARK_FILE_NAME) - - def tearDown(self): - self.delete_gcs_bucket(BUCKET) - super().tearDown() - - @provide_gcp_context(GCP_GCS_KEY) - def test_run_example_dag(self): - self.run_dag(dag_id="example_dataplex", dag_folder=CLOUD_DAG_FOLDER) diff --git a/tests/system/providers/google/cloud/dataplex/__init__.py b/tests/system/providers/google/cloud/dataplex/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/dataplex/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/google/cloud/example_dags/example_dataplex.py b/tests/system/providers/google/cloud/dataplex/example_dataplex.py similarity index 53% rename from airflow/providers/google/cloud/example_dags/example_dataplex.py rename to tests/system/providers/google/cloud/dataplex/example_dataplex.py index b12780176a897..bc2b6462b8e92 100644 --- a/airflow/providers/google/cloud/example_dags/example_dataplex.py +++ b/tests/system/providers/google/cloud/dataplex/example_dataplex.py @@ -21,25 +21,41 @@ import datetime import os +from pathlib import Path from airflow import models from airflow.models.baseoperator import chain from airflow.providers.google.cloud.operators.dataplex import ( + DataplexCreateLakeOperator, DataplexCreateTaskOperator, + DataplexDeleteLakeOperator, DataplexDeleteTaskOperator, DataplexGetTaskOperator, DataplexListTasksOperator, ) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.sensors.dataplex import DataplexTaskStateSensor +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "INVALID PROJECT ID") -REGION = os.environ.get("GCP_REGION", "INVALID REGION") -LAKE_ID = os.environ.get("GCP_LAKE_ID", "INVALID LAKE ID") -SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC", "XYZ@developer.gserviceaccount.com") -BUCKET = os.environ.get("GCP_DATAPLEX_BUCKET", "INVALID BUCKET NAME") -SPARK_FILE_NAME = os.environ.get("SPARK_FILE_NAME", "INVALID FILE NAME") -SPARK_FILE_FULL_PATH = f"gs://{BUCKET}/{SPARK_FILE_NAME}" -DATAPLEX_TASK_ID = "task001" +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") + +DAG_ID = "example_dataplex" + +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" + +SPARK_FILE_NAME = "spark_example_pi.py" +CURRENT_FOLDER = Path(__file__).parent +FILE_LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources" / SPARK_FILE_NAME) + +LAKE_ID = f"test-lake-{ENV_ID}" +REGION = "us-central1" + +SERVICE_ACC = os.environ.get("GCP_DATAPLEX_SERVICE_ACC") + +SPARK_FILE_FULL_PATH = f"gs://{BUCKET_NAME}/{SPARK_FILE_NAME}" +DATAPLEX_TASK_ID = f"test-task-{ENV_ID}" TRIGGER_SPEC_TYPE = "ON_DEMAND" # [START howto_dataplex_configuration] @@ -50,10 +66,39 @@ } # [END howto_dataplex_configuration] +# [START howto_dataplex_lake_configuration] +EXAMPLE_LAKE_BODY = { + "display_name": "test_display_name", + "labels": [], + "description": "test_description", + "metastore": {"service": ""}, +} +# [END howto_dataplex_lake_configuration] + + with models.DAG( - "example_dataplex", + DAG_ID, start_date=datetime.datetime(2021, 1, 1), + schedule="@once", + tags=["example", "dataplex"], ) as dag: + + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=FILE_LOCAL_PATH, + dst=SPARK_FILE_NAME, + bucket=BUCKET_NAME, + ) + # [START howto_dataplex_create_lake_operator] + create_lake = DataplexCreateLakeOperator( + project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake" + ) + # [END howto_dataplex_create_lake_operator] + # [START howto_dataplex_create_task_operator] create_dataplex_task = DataplexCreateTaskOperator( project_id=PROJECT_ID, @@ -71,19 +116,19 @@ region=REGION, lake_id=LAKE_ID, body=EXAMPLE_TASK_BODY, - dataplex_task_id=DATAPLEX_TASK_ID, + dataplex_task_id=f"{DATAPLEX_TASK_ID}-1", asynchronous=True, task_id="create_dataplex_task_async", ) # [END howto_dataplex_async_create_task_operator] # [START howto_dataplex_delete_task_operator] - delete_dataplex_task = DataplexDeleteTaskOperator( + delete_dataplex_task_async = DataplexDeleteTaskOperator( project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, - dataplex_task_id=DATAPLEX_TASK_ID, - task_id="delete_dataplex_task", + dataplex_task_id=f"{DATAPLEX_TASK_ID}-1", + task_id="delete_dataplex_task_async", ) # [END howto_dataplex_delete_task_operator] @@ -113,11 +158,57 @@ ) # [END howto_dataplex_task_state_sensor] + # [START howto_dataplex_delete_task_operator] + delete_dataplex_task = DataplexDeleteTaskOperator( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + dataplex_task_id=DATAPLEX_TASK_ID, + task_id="delete_dataplex_task", + trigger_rule=TriggerRule.ALL_DONE, + ) + # [END howto_dataplex_delete_task_operator] + + # [START howto_dataplex_delete_lake_operator] + delete_lake = DataplexDeleteLakeOperator( + project_id=PROJECT_ID, + region=REGION, + lake_id=LAKE_ID, + task_id="delete_lake", + trigger_rule=TriggerRule.ALL_DONE, + ) + # [END howto_dataplex_delete_lake_operator] + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + chain( + # TEST SETUP + create_bucket, + upload_file, + # TEST BODY + create_lake, create_dataplex_task, get_dataplex_task, list_dataplex_task, - delete_dataplex_task, create_dataplex_task_async, + delete_dataplex_task_async, dataplex_task_state, + # TEST TEARDOWN + delete_dataplex_task, + delete_lake, + delete_bucket, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/cloud/dataplex/resources/__init__.py b/tests/system/providers/google/cloud/dataplex/resources/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/google/cloud/dataplex/resources/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py new file mode 100644 index 0000000000000..77a7cb9e6ca41 --- /dev/null +++ b/tests/system/providers/google/cloud/dataplex/resources/spark_example_pi.py @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import sys +from operator import add +from random import random + +from pyspark.sql import SparkSession + +if __name__ == "__main__": + """ + Usage: pi [partitions] + """ + spark = SparkSession.builder.appName("PythonPi").getOrCreate() + + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 + n = 100000 * partitions + + def f(_: int) -> float: + x = random() * 2 - 1 + y = random() * 2 - 1 + return 1 if x**2 + y**2 <= 1 else 0 + + count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + print(f"Pi is roughly {4.0 * count / n:f}") + + spark.stop()