From f4825b7b43ad0748077831d6c329e40b9ecc5ab9 Mon Sep 17 00:00:00 2001 From: VladaZakharova Date: Mon, 20 Jan 2025 16:56:34 +0100 Subject: [PATCH] Add new operators for Entry Type (#45799) Co-authored-by: Ulada Zakharava --- .../operators/cloud/dataplex.rst | 84 ++++ docs/spelling_wordlist.txt | 2 + .../providers/google/cloud/hooks/dataplex.py | 200 ++++++++- .../providers/google/cloud/links/dataplex.py | 49 ++ .../google/cloud/operators/dataplex.py | 418 ++++++++++++++++++ .../airflow/providers/google/provider.yaml | 2 + .../tests/google/cloud/hooks/test_dataplex.py | 105 +++++ .../tests/google/cloud/links/test_dataplex.py | 55 +++ .../google/cloud/operators/test_dataplex.py | 150 ++++++- .../dataplex/example_dataplex_catalog.py | 70 ++- 10 files changed, 1131 insertions(+), 4 deletions(-) diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst index 1846f1ad41b5ee..a8bd9acde56b2d 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataplex.rst @@ -511,3 +511,87 @@ use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp :dedent: 4 :start-after: [START howto_operator_dataplex_catalog_update_entry_group] :end-before: [END howto_operator_dataplex_catalog_update_entry_group] + +.. _howto/operator:DataplexCatalogCreateEntryTypeOperator: + +Create an EntryType +-------------------- + +To create an Entry Type in specific location in Dataplex Catalog you can +use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator` +For more information about the available fields to pass when creating an Entry Type, visit `Entry Type resource configuration. `__ + +A simple Entry Group configuration can look as followed: + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 0 + :start-after: [START howto_dataplex_entry_type_configuration] + :end-before: [END howto_dataplex_entry_type_configuration] + +With this configuration you can create an Entry Type resource: + +:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator` + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dataplex_catalog_create_entry_type] + :end-before: [END howto_operator_dataplex_catalog_create_entry_type] + +.. _howto/operator:DataplexCatalogDeleteEntryTypeOperator: + +Delete an EntryType +-------------------- + +To delete an Entry Type in specific location in Dataplex Catalog you can +use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryTypeOperator` + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dataplex_catalog_delete_entry_type] + :end-before: [END howto_operator_dataplex_catalog_delete_entry_type] + +.. _howto/operator:DataplexCatalogListEntryTypesOperator: + +List EntryTypes +---------------- + +To list all Entry Types in specific location in Dataplex Catalog you can +use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryTypesOperator`. +This operator also supports filtering and ordering the result of the operation. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dataplex_catalog_list_entry_types] + :end-before: [END howto_operator_dataplex_catalog_list_entry_types] + +.. _howto/operator:DataplexCatalogGetEntryTypeOperator: + +Get an EntryType +----------------- + +To retrieve an Entry Group in specific location in Dataplex Catalog you can +use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryTypeOperator` + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dataplex_catalog_get_entry_type] + :end-before: [END howto_operator_dataplex_catalog_get_entry_type] + +.. _howto/operator:DataplexCatalogUpdateEntryTypeOperator: + +Update an EntryType +-------------------- + +To update an Entry Type in specific location in Dataplex Catalog you can +use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryTypeOperator` + +.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_dataplex_catalog_update_entry_type] + :end-before: [END howto_operator_dataplex_catalog_update_entry_type] diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index f0f7d90518fb5c..2b35ed5bc2dd97 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -578,6 +578,8 @@ EntryGroup EntryGroups entrypoint entrypoints +EntryType +EntryTypes Enum enum enums diff --git a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py index cb2c7e41a20676..df6a4414284c76 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/hooks/dataplex.py @@ -36,6 +36,7 @@ DataScan, DataScanJob, EntryGroup, + EntryType, Lake, Task, Zone, @@ -54,7 +55,10 @@ from google.api_core.operation import Operation from google.api_core.retry import Retry from google.api_core.retry_async import AsyncRetry - from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager + from google.cloud.dataplex_v1.services.catalog_service.pagers import ( + ListEntryGroupsPager, + ListEntryTypesPager, + ) from googleapiclient.discovery import Resource PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}" @@ -134,6 +138,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation): error = operation.exception(timeout=timeout) raise AirflowException(error) + @GoogleBaseHook.fallback_to_default_project_id + def create_entry_type( + self, + location: str, + entry_type_id: str, + entry_type_configuration: EntryType | dict, + project_id: str = PROVIDE_PROJECT_ID, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create an EntryType resource. + + :param location: Required. The ID of the Google Cloud location that the task belongs to. + :param entry_type_id: Required. EntryType identifier. + :param entry_type_configuration: Required. EntryType configuration body. + :param project_id: Optional. The ID of the Google Cloud project that the task belongs to. + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + """ + client = self.get_dataplex_catalog_client() + return client.create_entry_type( + request={ + "parent": client.common_location_path(project_id, location), + "entry_type_id": entry_type_id, + "entry_type": entry_type_configuration, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def get_entry_type( + self, + location: str, + entry_type_id: str, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> EntryType: + """ + Get an EntryType resource. + + :param location: Required. The ID of the Google Cloud location that the task belongs to. + :param entry_type_id: Required. EntryGroup identifier. + :param project_id: Optional. The ID of the Google Cloud project that the task belongs to. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + """ + client = self.get_dataplex_catalog_client() + return client.get_entry_type( + request={ + "name": client.entry_type_path(project_id, location, entry_type_id), + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_entry_type( + self, + location: str, + entry_type_id: str, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Delete an EntryType resource. + + :param location: Required. The ID of the Google Cloud location that the task belongs to. + :param entry_type_id: Required. EntryType identifier. + :param project_id: Optional. The ID of the Google Cloud project that the task belongs to. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + """ + client = self.get_dataplex_catalog_client() + return client.delete_entry_type( + request={ + "name": client.entry_type_path(project_id, location, entry_type_id), + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def list_entry_types( + self, + location: str, + filter_by: str | None = None, + order_by: str | None = None, + page_size: int | None = None, + page_token: str | None = None, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> ListEntryTypesPager: + """ + List EntryTypes resources from specific location. + + :param location: Required. The ID of the Google Cloud location that the task belongs to. + :param filter_by: Optional. Filter to apply on the list results. + :param order_by: Optional. Fields to order the results by. + :param page_size: Optional. Maximum number of EntryGroups to return on one page. + :param page_token: Optional. Token to retrieve the next page of results. + :param project_id: Optional. The ID of the Google Cloud project that the task belongs to. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + """ + client = self.get_dataplex_catalog_client() + return client.list_entry_types( + request={ + "parent": client.common_location_path(project_id, location), + "filter": filter_by, + "order_by": order_by, + "page_size": page_size, + "page_token": page_token, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def update_entry_type( + self, + location: str, + entry_type_id: str, + entry_type_configuration: dict | EntryType, + project_id: str = PROVIDE_PROJECT_ID, + update_mask: list[str] | FieldMask | None = None, + validate_only: bool | None = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Update an EntryType resource. + + :param entry_type_id: Required. ID of the EntryType to update. + :param entry_type_configuration: Required. The updated configuration body of the EntryType. + :param location: Required. The ID of the Google Cloud location that the task belongs to. + :param update_mask: Optional. Names of fields whose values to overwrite on an entry group. + If this parameter is absent or empty, all modifiable fields are overwritten. If such + fields are non-required and omitted in the request body, their values are emptied. + :param project_id: Optional. The ID of the Google Cloud project that the task belongs to. + :param validate_only: Optional. The service validates the request without performing any mutations. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + """ + client = self.get_dataplex_catalog_client() + _entry_type = ( + deepcopy(entry_type_configuration) + if isinstance(entry_type_configuration, dict) + else EntryType.to_dict(entry_type_configuration) + ) + _entry_type["name"] = client.entry_type_path(project_id, location, entry_type_id) + return client.update_entry_type( + request={ + "entry_type": _entry_type, + "update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + @GoogleBaseHook.fallback_to_default_project_id def create_entry_group( self, diff --git a/providers/src/airflow/providers/google/cloud/links/dataplex.py b/providers/src/airflow/providers/google/cloud/links/dataplex.py index e0bd1ae5844790..db9bba40276ce3 100644 --- a/providers/src/airflow/providers/google/cloud/links/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/links/dataplex.py @@ -35,6 +35,10 @@ DATAPLEX_CATALOG_ENTRY_GROUP_LINK = ( "/dataplex/projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}?project={project_id}" ) +DATAPLEX_CATALOG_ENTRY_TYPE_LINK = ( + "/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}" +) +DATAPLEX_CATALOG_ENTRY_TYPES_LINK = "/dataplex/catalog/entry-types?project={project_id}" class DataplexTaskLink(BaseGoogleLink): @@ -150,3 +154,48 @@ def persist( "project_id": task_instance.project_id, }, ) + + +class DataplexCatalogEntryTypeLink(BaseGoogleLink): + """Helper class for constructing Dataplex Catalog EntryType link.""" + + name = "Dataplex Catalog EntryType" + key = "dataplex_catalog_entry_type_key" + format_str = DATAPLEX_CATALOG_ENTRY_TYPE_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + ): + task_instance.xcom_push( + context=context, + key=DataplexCatalogEntryTypeLink.key, + value={ + "entry_type_id": task_instance.entry_type_id, + "location": task_instance.location, + "project_id": task_instance.project_id, + }, + ) + + +class DataplexCatalogEntryTypesLink(BaseGoogleLink): + """Helper class for constructing Dataplex Catalog EntryTypes link.""" + + name = "Dataplex Catalog EntryTypes" + key = "dataplex_catalog_entry_types_key" + format_str = DATAPLEX_CATALOG_ENTRY_TYPES_LINK + + @staticmethod + def persist( + context: Context, + task_instance, + ): + task_instance.xcom_push( + context=context, + key=DataplexCatalogEntryTypesLink.key, + value={ + "location": task_instance.location, + "project_id": task_instance.project_id, + }, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/dataplex.py b/providers/src/airflow/providers/google/cloud/operators/dataplex.py index 33874063955ff1..ab351b36a28424 100644 --- a/providers/src/airflow/providers/google/cloud/operators/dataplex.py +++ b/providers/src/airflow/providers/google/cloud/operators/dataplex.py @@ -44,8 +44,10 @@ DataScan, DataScanJob, EntryGroup, + EntryType, Lake, ListEntryGroupsResponse, + ListEntryTypesResponse, Task, Zone, ) @@ -56,6 +58,8 @@ from airflow.providers.google.cloud.links.dataplex import ( DataplexCatalogEntryGroupLink, DataplexCatalogEntryGroupsLink, + DataplexCatalogEntryTypeLink, + DataplexCatalogEntryTypesLink, DataplexLakeLink, DataplexTaskLink, DataplexTasksLink, @@ -2579,3 +2583,417 @@ def execute(self, context: Context): if not self.validate_request: self.log.info("EntryGroup %s was successfully updated.", self.entry_group_id) return result + + +class DataplexCatalogCreateEntryTypeOperator(DataplexCatalogBaseOperator): + """ + Create an EntryType resource. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataplexCatalogCreateEntryTypeOperator` + + :param entry_type_id: Required. EntryType identifier. + :param entry_type_configuration: Required. EntryType configuration. + For more details please see API documentation: + https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"entry_type_id", "entry_type_configuration"} | set(DataplexCatalogBaseOperator.template_fields) + ) + operator_extra_links = (DataplexCatalogEntryTypeLink(),) + + def __init__( + self, + entry_type_id: str, + entry_type_configuration: EntryType | dict, + validate_request: bool = False, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.entry_type_id = entry_type_id + self.entry_type_configuration = entry_type_configuration + self.validate_request = validate_request + + def execute(self, context: Context): + DataplexCatalogEntryTypeLink.persist( + context=context, + task_instance=self, + ) + + if self.validate_request: + self.log.info("Validating a Create Dataplex Catalog EntryType request.") + else: + self.log.info("Creating a Dataplex Catalog EntryType.") + + try: + operation = self.hook.create_entry_type( + entry_type_id=self.entry_type_id, + entry_type_configuration=self.entry_type_configuration, + location=self.location, + project_id=self.project_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + entry_type = self.hook.wait_for_operation(timeout=self.timeout, operation=operation) + except AlreadyExists: + entry_type = self.hook.get_entry_type( + entry_type_id=self.entry_type_id, + location=self.location, + project_id=self.project_id, + ) + self.log.info( + "Dataplex Catalog EntryType %s already exists.", + self.entry_type_id, + ) + result = EntryType.to_dict(entry_type) + return result + except Exception as ex: + raise AirflowException(ex) + else: + result = EntryType.to_dict(entry_type) if not self.validate_request else None + + if not self.validate_request: + self.log.info("Dataplex Catalog EntryType %s was successfully created.", self.entry_type_id) + return result + + +class DataplexCatalogGetEntryTypeOperator(DataplexCatalogBaseOperator): + """ + Get an EntryType resource. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataplexCatalogGetEntryTypeOperator` + + :param entry_type_id: Required. EntryType identifier. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"entry_type_id"} | set(DataplexCatalogBaseOperator.template_fields) + ) + operator_extra_links = (DataplexCatalogEntryTypeLink(),) + + def __init__( + self, + entry_type_id: str, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.entry_type_id = entry_type_id + + def execute(self, context: Context): + DataplexCatalogEntryTypeLink.persist( + context=context, + task_instance=self, + ) + self.log.info( + "Retrieving Dataplex Catalog EntryType %s.", + self.entry_type_id, + ) + try: + entry_type = self.hook.get_entry_type( + entry_type_id=self.entry_type_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except NotFound: + self.log.info( + "Dataplex Catalog EntryType %s not found.", + self.entry_type_id, + ) + raise AirflowException(NotFound) + except Exception as ex: + raise AirflowException(ex) + + return EntryType.to_dict(entry_type) + + +class DataplexCatalogDeleteEntryTypeOperator(DataplexCatalogBaseOperator): + """ + Delete an EntryType resource. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataplexCatalogDeleteEntryTypeOperator` + + :param entry_type_id: Required. EntryType identifier. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"entry_type_id"} | set(DataplexCatalogBaseOperator.template_fields) + ) + + def __init__( + self, + entry_type_id: str, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.entry_type_id = entry_type_id + + def execute(self, context: Context): + self.log.info( + "Deleting Dataplex Catalog EntryType %s.", + self.entry_type_id, + ) + try: + operation = self.hook.delete_entry_type( + entry_type_id=self.entry_type_id, + location=self.location, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.hook.wait_for_operation(timeout=self.timeout, operation=operation) + + except NotFound: + self.log.info( + "Dataplex Catalog EntryType %s not found.", + self.entry_type_id, + ) + raise AirflowException(NotFound) + except Exception as ex: + raise AirflowException(ex) + return None + + +class DataplexCatalogListEntryTypesOperator(DataplexCatalogBaseOperator): + """ + List EntryType resources. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataplexCatalogListEntryTypesOperator` + + :param filter_by: Optional. Filter to apply on the list results. + :param order_by: Optional. Fields to order the results by. + :param page_size: Optional. Maximum number of EntryGroups to return on the page. + :param page_token: Optional. Token to retrieve the next page of results. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple(DataplexCatalogBaseOperator.template_fields) + operator_extra_links = (DataplexCatalogEntryTypesLink(),) + + def __init__( + self, + page_size: int | None = None, + page_token: str | None = None, + filter_by: str | None = None, + order_by: str | None = None, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.page_size = page_size + self.page_token = page_token + self.filter_by = filter_by + self.order_by = order_by + + def execute(self, context: Context): + DataplexCatalogEntryTypesLink.persist( + context=context, + task_instance=self, + ) + self.log.info( + "Listing Dataplex Catalog EntryType from location %s.", + self.location, + ) + try: + entry_type_on_page = self.hook.list_entry_types( + location=self.location, + project_id=self.project_id, + page_size=self.page_size, + page_token=self.page_token, + filter_by=self.filter_by, + order_by=self.order_by, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + self.log.info("EntryGroup on page: %s", entry_type_on_page) + self.xcom_push( + context=context, + key="entry_type_page", + value=ListEntryTypesResponse.to_dict(entry_type_on_page._response), + ) + except Exception as ex: + raise AirflowException(ex) + + # Constructing list to return EntryGroups in readable format + entry_types_list = [ + MessageToDict(entry_type._pb, preserving_proto_field_name=True) + for entry_type in next(iter(entry_type_on_page.pages)).entry_types + ] + return entry_types_list + + +class DataplexCatalogUpdateEntryTypeOperator(DataplexCatalogBaseOperator): + """ + Update an EntryType resource. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:DataplexCatalogUpdateEntryTypeOperator` + + :param project_id: Required. The ID of the Google Cloud project that the task belongs to. + :param location: Required. The ID of the Google Cloud region that the task belongs to. + :param update_mask: Optional. Names of fields whose values to overwrite on an entry group. + If this parameter is absent or empty, all modifiable fields are overwritten. If such + fields are non-required and omitted in the request body, their values are emptied. + :param entry_type_id: Required. ID of the EntryType to update. + :param entry_type_configuration: Required. The updated configuration body of the EntryType. + For more details please see API documentation: + https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup + :param validate_only: Optional. The service validates the request without performing any mutations. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests + will not be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param gcp_conn_id: Optional. The connection ID to use when fetching connection info. + :param impersonation_chain: Optional. Service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"entry_type_id", "entry_type_configuration", "update_mask"} + | set(DataplexCatalogBaseOperator.template_fields) + ) + operator_extra_links = (DataplexCatalogEntryTypeLink(),) + + def __init__( + self, + entry_type_id: str, + entry_type_configuration: dict | EntryType, + update_mask: list[str] | FieldMask | None = None, + validate_request: bool | None = False, + *args, + **kwargs, + ) -> None: + super().__init__(*args, **kwargs) + self.entry_type_id = entry_type_id + self.entry_type_configuration = entry_type_configuration + self.update_mask = update_mask + self.validate_request = validate_request + + def execute(self, context: Context): + DataplexCatalogEntryTypeLink.persist( + context=context, + task_instance=self, + ) + + if self.validate_request: + self.log.info("Validating an Update Dataplex Catalog EntryType request.") + else: + self.log.info( + "Updating Dataplex Catalog EntryType %s.", + self.entry_type_id, + ) + try: + operation = self.hook.update_entry_type( + location=self.location, + project_id=self.project_id, + entry_type_id=self.entry_type_id, + entry_type_configuration=self.entry_type_configuration, + update_mask=self.update_mask, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + entry_type = self.hook.wait_for_operation(timeout=self.timeout, operation=operation) + + except NotFound as ex: + self.log.info("Specified EntryType was not found.") + raise AirflowException(ex) + except Exception as exc: + raise AirflowException(exc) + else: + result = EntryType.to_dict(entry_type) if not self.validate_request else None + + if not self.validate_request: + self.log.info("EntryType %s was successfully updated.", self.entry_type_id) + return result diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index 97277806b855c8..ca086d8a6d2749 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -1205,6 +1205,8 @@ extra-links: - airflow.providers.google.cloud.links.dataplex.DataplexLakeLink - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupLink - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryGroupsLink + - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryTypeLink + - airflow.providers.google.cloud.links.dataplex.DataplexCatalogEntryTypesLink - airflow.providers.google.cloud.links.bigquery.BigQueryDatasetLink - airflow.providers.google.cloud.links.bigquery.BigQueryTableLink - airflow.providers.google.cloud.links.bigquery.BigQueryJobDetailLink diff --git a/providers/tests/google/cloud/hooks/test_dataplex.py b/providers/tests/google/cloud/hooks/test_dataplex.py index 4a4f550eca67b5..39f447e591b40d 100644 --- a/providers/tests/google/cloud/hooks/test_dataplex.py +++ b/providers/tests/google/cloud/hooks/test_dataplex.py @@ -53,6 +53,9 @@ ENTRY_GROUP_ID = "entry-group-id" ENTRY_GROUP_BODY = {"description": "Some descr"} ENTRY_GROUP_UPDATED_BODY = {"description": "Some new descr"} +ENTRY_TYPE_ID = "entry-type-id" +ENTRY_TYPE_BODY = {"description": "Some descr"} +ENTRY_TYPE_UPDATED_BODY = {"description": "Some new descr"} UPDATE_MASK = ["description"] COMMON_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}" @@ -63,6 +66,7 @@ ASSET_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/assets/{ASSET_ID}" DATASCAN_PARENT = f"projects/{PROJECT_ID}/locations/{REGION}" ENTRY_GROUP_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}/entryGroup/{ENTRY_GROUP_ID}" +ENTRY_TYPE_PARENT = f"projects/{PROJECT_ID}/locations/{LOCATION}/entryType/{ENTRY_TYPE_ID}" class TestDataplexHook: @@ -425,3 +429,104 @@ def test_update_entry_group(self, mock_client): timeout=None, metadata=(), ) + + @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT) + def test_create_entry_type(self, mock_client): + mock_common_location_path = mock_client.return_value.common_location_path + mock_common_location_path.return_value = COMMON_PARENT + self.hook.create_entry_type( + project_id=PROJECT_ID, + location=LOCATION, + entry_type_id=ENTRY_TYPE_ID, + entry_type_configuration=ENTRY_TYPE_BODY, + validate_only=False, + ) + mock_client.return_value.create_entry_type.assert_called_once_with( + request=dict( + parent=COMMON_PARENT, + entry_type_id=ENTRY_TYPE_ID, + entry_type=ENTRY_TYPE_BODY, + validate_only=False, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT) + def test_delete_entry_type(self, mock_client): + mock_common_location_path = mock_client.return_value.entry_type_path + mock_common_location_path.return_value = ENTRY_TYPE_PARENT + self.hook.delete_entry_type(project_id=PROJECT_ID, location=LOCATION, entry_type_id=ENTRY_TYPE_ID) + + mock_client.return_value.delete_entry_type.assert_called_once_with( + request=dict( + name=ENTRY_TYPE_PARENT, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT) + def test_list_entry_types(self, mock_client): + mock_common_location_path = mock_client.return_value.common_location_path + mock_common_location_path.return_value = COMMON_PARENT + self.hook.list_entry_types( + project_id=PROJECT_ID, + location=LOCATION, + order_by="name", + page_size=2, + filter_by="'description' = 'Some descr'", + ) + mock_client.return_value.list_entry_types.assert_called_once_with( + request=dict( + parent=COMMON_PARENT, + page_size=2, + page_token=None, + filter="'description' = 'Some descr'", + order_by="name", + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT) + def test_get_entry_type(self, mock_client): + mock_common_location_path = mock_client.return_value.entry_type_path + mock_common_location_path.return_value = ENTRY_TYPE_PARENT + self.hook.get_entry_type(project_id=PROJECT_ID, location=LOCATION, entry_type_id=ENTRY_TYPE_ID) + + mock_client.return_value.get_entry_type.assert_called_once_with( + request=dict( + name=ENTRY_TYPE_PARENT, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + @mock.patch(DATAPLEX_CATALOG_HOOK_CLIENT) + def test_update_entry_type(self, mock_client): + mock_common_location_path = mock_client.return_value.entry_type_path + mock_common_location_path.return_value = ENTRY_TYPE_PARENT + self.hook.update_entry_type( + project_id=PROJECT_ID, + location=LOCATION, + entry_type_id=ENTRY_TYPE_ID, + entry_type_configuration=ENTRY_TYPE_UPDATED_BODY, + update_mask=UPDATE_MASK, + validate_only=False, + ) + + mock_client.return_value.update_entry_type.assert_called_once_with( + request=dict( + entry_type={**ENTRY_TYPE_UPDATED_BODY, "name": ENTRY_TYPE_PARENT}, + update_mask=FieldMask(paths=UPDATE_MASK), + validate_only=False, + ), + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/providers/tests/google/cloud/links/test_dataplex.py b/providers/tests/google/cloud/links/test_dataplex.py index 05661c84bd3eee..7b5fc9486aaba9 100644 --- a/providers/tests/google/cloud/links/test_dataplex.py +++ b/providers/tests/google/cloud/links/test_dataplex.py @@ -22,13 +22,17 @@ from airflow.providers.google.cloud.links.dataplex import ( DataplexCatalogEntryGroupLink, DataplexCatalogEntryGroupsLink, + DataplexCatalogEntryTypeLink, + DataplexCatalogEntryTypesLink, DataplexLakeLink, DataplexTaskLink, DataplexTasksLink, ) from airflow.providers.google.cloud.operators.dataplex import ( DataplexCatalogCreateEntryGroupOperator, + DataplexCatalogCreateEntryTypeOperator, DataplexCatalogGetEntryGroupOperator, + DataplexCatalogGetEntryTypeOperator, DataplexCreateLakeOperator, DataplexCreateTaskOperator, DataplexListTasksOperator, @@ -39,6 +43,9 @@ TEST_ENTRY_GROUP_ID = "test-entry-group-id" TEST_ENTRY_GROUP_ID_BODY = {"description": "some description"} TEST_ENTRY_GROUPS_ID = "test-entry-groups-id" +TEST_ENTRY_TYPE_ID = "test-entry-type-id" +TEST_ENTRY_TYPE_ID_BODY = {"description": "some description"} +TEST_ENTRY_TYPES_ID = "test-entry-groups-id" TEST_TASK_ID = "test-task-id" TEST_TASKS_ID = "test-tasks-id" TEST_LAKE_ID = "test-lake-id" @@ -52,6 +59,13 @@ EXPECTED_DATAPLEX_CATALOG_ENTRY_GROUPS_LINK = ( DATAPLEX_BASE_LINK + f"catalog/entry-groups?project={TEST_PROJECT_ID}" ) +EXPECTED_DATAPLEX_CATALOG_ENTRY_TYPE_LINK = ( + DATAPLEX_BASE_LINK + + f"projects/{TEST_PROJECT_ID}/locations/{TEST_LOCATION}/entryTypes/{TEST_ENTRY_TYPE_ID}?project={TEST_PROJECT_ID}" +) +EXPECTED_DATAPLEX_CATALOG_ENTRY_TYPES_LINK = ( + DATAPLEX_BASE_LINK + f"catalog/entry-types?project={TEST_PROJECT_ID}" +) DATAPLEX_LAKE_LINK = ( DATAPLEX_BASE_LINK + f"lakes/{TEST_LAKE_ID};location={TEST_LOCATION}?project={TEST_PROJECT_ID}" ) @@ -166,3 +180,44 @@ def test_get_link(self, create_task_instance_of_operator, session): link.persist(context={"ti": ti}, task_instance=ti.task) actual_url = link.get_link(operator=ti.task, ti_key=ti.key) assert actual_url == expected_url + + +class TestDataplexCatalogEntryTypeLink: + @pytest.mark.db_test + def test_get_link(self, create_task_instance_of_operator, session): + expected_url = EXPECTED_DATAPLEX_CATALOG_ENTRY_TYPE_LINK + link = DataplexCatalogEntryTypeLink() + ti = create_task_instance_of_operator( + DataplexCatalogGetEntryTypeOperator, + dag_id="test_link_dag", + task_id="test_link_task", + location=TEST_LOCATION, + entry_type_id=TEST_ENTRY_TYPE_ID, + project_id=TEST_PROJECT_ID, + ) + session.add(ti) + session.commit() + link.persist(context={"ti": ti}, task_instance=ti.task) + actual_url = link.get_link(operator=ti.task, ti_key=ti.key) + assert actual_url == expected_url + + +class TestDataplexCatalogEntryTypesLink: + @pytest.mark.db_test + def test_get_link(self, create_task_instance_of_operator, session): + expected_url = EXPECTED_DATAPLEX_CATALOG_ENTRY_TYPES_LINK + link = DataplexCatalogEntryTypesLink() + ti = create_task_instance_of_operator( + DataplexCatalogCreateEntryTypeOperator, + dag_id="test_link_dag", + task_id="test_link_task", + location=TEST_LOCATION, + entry_type_id=TEST_ENTRY_TYPE_ID, + entry_type_configuration=TEST_ENTRY_TYPE_ID_BODY, + project_id=TEST_PROJECT_ID, + ) + session.add(ti) + session.commit() + link.persist(context={"ti": ti}, task_instance=ti.task) + actual_url = link.get_link(operator=ti.task, ti_key=ti.key) + assert actual_url == expected_url diff --git a/providers/tests/google/cloud/operators/test_dataplex.py b/providers/tests/google/cloud/operators/test_dataplex.py index 2aff961623bb36..ef5faa4637e6bb 100644 --- a/providers/tests/google/cloud/operators/test_dataplex.py +++ b/providers/tests/google/cloud/operators/test_dataplex.py @@ -20,15 +20,24 @@ import pytest from google.api_core.gapic_v1.method import DEFAULT -from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager -from google.cloud.dataplex_v1.types import ListEntryGroupsRequest, ListEntryGroupsResponse +from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager, ListEntryTypesPager +from google.cloud.dataplex_v1.types import ( + ListEntryGroupsRequest, + ListEntryGroupsResponse, + ListEntryTypesRequest, + ListEntryTypesResponse, +) from airflow.exceptions import TaskDeferred from airflow.providers.google.cloud.operators.dataplex import ( DataplexCatalogCreateEntryGroupOperator, + DataplexCatalogCreateEntryTypeOperator, DataplexCatalogDeleteEntryGroupOperator, + DataplexCatalogDeleteEntryTypeOperator, DataplexCatalogGetEntryGroupOperator, + DataplexCatalogGetEntryTypeOperator, DataplexCatalogListEntryGroupsOperator, + DataplexCatalogListEntryTypesOperator, DataplexCreateAssetOperator, DataplexCreateLakeOperator, DataplexCreateOrUpdateDataProfileScanOperator, @@ -58,6 +67,7 @@ ZONE_STR = "airflow.providers.google.cloud.operators.dataplex.Zone" ASSET_STR = "airflow.providers.google.cloud.operators.dataplex.Asset" ENTRY_GROUP_STR = "airflow.providers.google.cloud.operators.dataplex.EntryGroup" +ENTRY_TYPE_STR = "airflow.providers.google.cloud.operators.dataplex.EntryType" PROJECT_ID = "project-id" REGION = "region" @@ -80,6 +90,7 @@ ZONE_ID = "test_zone_id" JOB_ID = "test_job_id" ENTRY_GROUP_NAME = "test_entry_group" +ENTRY_TYPE_NAME = "test_entry_type" class TestDataplexCreateTaskOperator: @@ -877,3 +888,138 @@ def test_execute(self, hook_mock, entry_group_mock): timeout=None, metadata=(), ) + + +class TestDataplexCatalogCreateEntryTypeOperator: + @mock.patch(ENTRY_TYPE_STR) + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock, entry_group_mock): + op = DataplexCatalogCreateEntryTypeOperator( + task_id="create_task", + project_id=PROJECT_ID, + location=REGION, + entry_type_id=ENTRY_TYPE_NAME, + entry_type_configuration=BODY, + validate_request=None, + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + entry_group_mock.return_value.to_dict.return_value = None + hook_mock.return_value.wait_for_operation.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.create_entry_type.assert_called_once_with( + entry_type_id=ENTRY_TYPE_NAME, + entry_type_configuration=BODY, + location=REGION, + project_id=PROJECT_ID, + validate_only=None, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataplexCatalogGetEntryTypeOperator: + @mock.patch(ENTRY_TYPE_STR) + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock, entry_type_mock): + op = DataplexCatalogGetEntryTypeOperator( + project_id=PROJECT_ID, + location=REGION, + entry_type_id=ENTRY_TYPE_NAME, + task_id="get_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + op.execute(context=mock.MagicMock()) + entry_type_mock.return_value.to_dict.return_value = None + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.get_entry_type.assert_called_once_with( + project_id=PROJECT_ID, + location=REGION, + entry_type_id=ENTRY_TYPE_NAME, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataplexCatalogDeleteEntryTypeOperator: + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock): + op = DataplexCatalogDeleteEntryTypeOperator( + project_id=PROJECT_ID, + location=REGION, + entry_type_id=ENTRY_TYPE_NAME, + task_id="delete_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.wait_for_operation.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.delete_entry_type.assert_called_once_with( + project_id=PROJECT_ID, + location=REGION, + entry_type_id=ENTRY_TYPE_NAME, + retry=DEFAULT, + timeout=None, + metadata=(), + ) + + +class TestDataplexCatalogListEntryTypesOperator: + @mock.patch(ENTRY_TYPE_STR) + @mock.patch(HOOK_STR) + def test_execute(self, hook_mock, entry_type_mock): + op = DataplexCatalogListEntryTypesOperator( + project_id=PROJECT_ID, + location=REGION, + task_id="list_task", + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + hook_mock.return_value.list_entry_types.return_value = ListEntryTypesPager( + response=( + ListEntryTypesResponse( + entry_types=[ + { + "name": "aaa", + "description": "Test Entry Type 1", + "display_name": "Entry Type One", + } + ] + ) + ), + method=mock.MagicMock(), + request=ListEntryTypesRequest(parent=""), + ) + + entry_type_mock.return_value.to_dict.return_value = None + op.execute(context=mock.MagicMock()) + hook_mock.assert_called_once_with( + gcp_conn_id=GCP_CONN_ID, + impersonation_chain=IMPERSONATION_CHAIN, + ) + + hook_mock.return_value.list_entry_types.assert_called_once_with( + project_id=PROJECT_ID, + location=REGION, + page_size=None, + page_token=None, + filter_by=None, + order_by=None, + retry=DEFAULT, + timeout=None, + metadata=(), + ) diff --git a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py index 8eec8a317d640d..6f511d0f8e9f91 100644 --- a/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py +++ b/providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py @@ -26,10 +26,15 @@ from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataplex import ( DataplexCatalogCreateEntryGroupOperator, + DataplexCatalogCreateEntryTypeOperator, DataplexCatalogDeleteEntryGroupOperator, + DataplexCatalogDeleteEntryTypeOperator, DataplexCatalogGetEntryGroupOperator, + DataplexCatalogGetEntryTypeOperator, DataplexCatalogListEntryGroupsOperator, + DataplexCatalogListEntryTypesOperator, DataplexCatalogUpdateEntryGroupOperator, + DataplexCatalogUpdateEntryTypeOperator, ) from airflow.utils.trigger_rule import TriggerRule @@ -46,6 +51,11 @@ ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"} # [END howto_dataplex_entry_group_configuration] +ENTRY_TYPE_NAME = f"{DAG_ID}_entry_type_{ENV_ID}".replace("_", "-") +# [START howto_dataplex_entry_type_configuration] +ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"} +# [END howto_dataplex_entry_type_configuration] + with DAG( DAG_ID, start_date=datetime.datetime(2021, 1, 1), @@ -63,6 +73,17 @@ ) # [END howto_operator_dataplex_catalog_create_entry_group] + # [START howto_operator_dataplex_catalog_create_entry_type] + create_entry_type = DataplexCatalogCreateEntryTypeOperator( + task_id="create_entry_type", + project_id=PROJECT_ID, + location=GCP_LOCATION, + entry_type_id=ENTRY_TYPE_NAME, + entry_type_configuration=ENTRY_TYPE_BODY, + validate_request=False, + ) + # [END howto_operator_dataplex_catalog_create_entry_type] + # [START howto_operator_dataplex_catalog_get_entry_group] get_entry_group = DataplexCatalogGetEntryGroupOperator( task_id="get_entry_group", @@ -72,6 +93,15 @@ ) # [END howto_operator_dataplex_catalog_get_entry_group] + # [START howto_operator_dataplex_catalog_get_entry_type] + get_entry_type = DataplexCatalogGetEntryTypeOperator( + task_id="get_entry_type", + project_id=PROJECT_ID, + location=GCP_LOCATION, + entry_type_id=ENTRY_TYPE_NAME, + ) + # [END howto_operator_dataplex_catalog_get_entry_type] + # [START howto_operator_dataplex_catalog_list_entry_groups] list_entry_group = DataplexCatalogListEntryGroupsOperator( task_id="list_entry_group", @@ -82,6 +112,16 @@ ) # [END howto_operator_dataplex_catalog_list_entry_groups] + # [START howto_operator_dataplex_catalog_list_entry_types] + list_entry_type = DataplexCatalogListEntryTypesOperator( + task_id="list_entry_type", + project_id=PROJECT_ID, + location=GCP_LOCATION, + order_by="name", + filter_by='display_name = "Display Name"', + ) + # [END howto_operator_dataplex_catalog_list_entry_types] + # [START howto_operator_dataplex_catalog_update_entry_group] update_entry_group = DataplexCatalogUpdateEntryGroupOperator( task_id="update_entry_group", @@ -93,6 +133,17 @@ ) # [END howto_operator_dataplex_catalog_update_entry_group] + # [START howto_operator_dataplex_catalog_update_entry_type] + update_entry_type = DataplexCatalogUpdateEntryTypeOperator( + task_id="update_entry_type", + project_id=PROJECT_ID, + location=GCP_LOCATION, + entry_type_id=ENTRY_TYPE_NAME, + entry_type_configuration={"display_name": "Updated Display Name"}, + update_mask=["display_name"], + ) + # [END howto_operator_dataplex_catalog_update_entry_type] + # [START howto_operator_dataplex_catalog_delete_entry_group] delete_entry_group = DataplexCatalogDeleteEntryGroupOperator( task_id="delete_entry_group", @@ -103,7 +154,24 @@ ) # [END howto_operator_dataplex_catalog_delete_entry_group] - create_entry_group >> get_entry_group >> list_entry_group >> update_entry_group >> delete_entry_group + # [START howto_operator_dataplex_catalog_delete_entry_type] + delete_entry_type = DataplexCatalogDeleteEntryTypeOperator( + task_id="delete_entry_type", + project_id=PROJECT_ID, + location=GCP_LOCATION, + entry_type_id=ENTRY_TYPE_NAME, + trigger_rule=TriggerRule.ALL_DONE, + ) + # [END howto_operator_dataplex_catalog_delete_entry_type] + + ( + [ + create_entry_group >> get_entry_group >> update_entry_group >> delete_entry_group, + list_entry_group, + create_entry_type >> get_entry_type >> update_entry_type >> delete_entry_type, + list_entry_type, + ] + ) from tests_common.test_utils.watcher import watcher