Skip to content

Commit

Permalink
Add operators for AspectType resource (apache#46240)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulada Zakharava <[email protected]>
  • Loading branch information
2 people authored and niklasr22 committed Feb 8, 2025
1 parent 24cd9e2 commit 14604fc
Show file tree
Hide file tree
Showing 10 changed files with 1,137 additions and 5 deletions.
84 changes: 84 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,87 @@ use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_type]
:end-before: [END howto_operator_dataplex_catalog_update_entry_type]

.. _howto/operator:DataplexCatalogCreateAspectTypeOperator:

Create an AspectType
--------------------

To create an Aspect Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`
For more information about the available fields to pass when creating an Aspect Type, visit `Aspect Type resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.aspectTypes#AspectType>`__

A simple Aspect Group configuration can look as followed:

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_aspect_type_configuration]
:end-before: [END howto_dataplex_aspect_type_configuration]

With this configuration you can create an Aspect Type resource:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateAspectTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_create_aspect_type]
:end-before: [END howto_operator_dataplex_catalog_create_aspect_type]

.. _howto/operator:DataplexCatalogDeleteAspectTypeOperator:

Delete an AspectType
--------------------

To delete an Aspect Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteAspectTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_delete_aspect_type]
:end-before: [END howto_operator_dataplex_catalog_delete_aspect_type]

.. _howto/operator:DataplexCatalogListAspectTypesOperator:

List AspectTypes
----------------

To list all Aspect Types in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListAspectTypesOperator`.
This operator also supports filtering and ordering the result of the operation.

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_list_aspect_types]
:end-before: [END howto_operator_dataplex_catalog_list_aspect_types]

.. _howto/operator:DataplexCatalogGetAspectTypeOperator:

Get an AspectType
-----------------

To retrieve an Aspect Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetAspectTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_get_aspect_type]
:end-before: [END howto_operator_dataplex_catalog_get_aspect_type]

.. _howto/operator:DataplexCatalogUpdateAspectTypeOperator:

Update an AspectType
--------------------

To update an Aspect Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateAspectTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_aspect_type]
:end-before: [END howto_operator_dataplex_catalog_update_aspect_type]
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ asciiart
asctime
asend
asia
AspectType
AspectTypes
assertEqualIgnoreMultipleSpaces
AssetEvent
AssetEvents
Expand Down
196 changes: 196 additions & 0 deletions providers/src/airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from google.cloud.dataplex_v1.services.catalog_service import CatalogServiceClient
from google.cloud.dataplex_v1.types import (
AspectType,
Asset,
DataScan,
DataScanJob,
Expand All @@ -56,6 +57,7 @@
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import (
ListAspectTypesPager,
ListEntryGroupsPager,
ListEntryTypesPager,
)
Expand Down Expand Up @@ -138,6 +140,78 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
error = operation.exception(timeout=timeout)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
def create_aspect_type(
self,
location: str,
aspect_type_id: str,
aspect_type_configuration: AspectType | dict,
project_id: str = PROVIDE_PROJECT_ID,
validate_only: bool = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Create an EntryType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param aspect_type_id: Required. AspectType identifier.
:param aspect_type_configuration: Required. AspectType configuration body.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. If set, performs request validation, but does not actually execute
the create request.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.create_aspect_type(
request={
"parent": client.common_location_path(project_id, location),
"aspect_type_id": aspect_type_id,
"aspect_type": aspect_type_configuration,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def get_aspect_type(
self,
location: str,
aspect_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> AspectType:
"""
Get an AspectType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param aspect_type_id: Required. AspectType identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.get_aspect_type(
request={
"name": client.aspect_type_path(project_id, location, aspect_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_type(
self,
Expand Down Expand Up @@ -210,6 +284,128 @@ def get_entry_type(
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_aspect_type(
self,
location: str,
aspect_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Delete an AspectType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param aspect_type_id: Required. AspectType identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.delete_aspect_type(
request={
"name": client.aspect_type_path(project_id, location, aspect_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def list_aspect_types(
self,
location: str,
filter_by: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListAspectTypesPager:
"""
List AspectTypes resources from specific location.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.list_aspect_types(
request={
"parent": client.common_location_path(project_id, location),
"filter": filter_by,
"order_by": order_by,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def update_aspect_type(
self,
location: str,
aspect_type_id: str,
aspect_type_configuration: dict | AspectType,
project_id: str = PROVIDE_PROJECT_ID,
update_mask: list[str] | FieldMask | None = None,
validate_only: bool | None = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Update an AspectType resource.
:param aspect_type_id: Required. ID of the AspectType to update.
:param aspect_type_configuration: Required. The updated configuration body of the AspectType.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
If this parameter is absent or empty, all modifiable fields are overwritten. If such
fields are non-required and omitted in the request body, their values are emptied.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. The service validates the request without performing any mutations.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
_aspect_type = (
deepcopy(aspect_type_configuration)
if isinstance(aspect_type_configuration, dict)
else AspectType.to_dict(aspect_type_configuration)
)
_aspect_type["name"] = client.aspect_type_path(project_id, location, aspect_type_id)
return client.update_aspect_type(
request={
"aspect_type": _aspect_type,
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_type(
self,
Expand Down
49 changes: 49 additions & 0 deletions providers/src/airflow/providers/google/cloud/links/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPES_LINK = "/dataplex/catalog/entry-types?project={project_id}"
DATAPLEX_CATALOG_ASPECT_TYPE_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/aspectTypes/{aspect_type_id}?project={project_id}"
)
DATAPLEX_CATALOG_ASPECT_TYPES_LINK = "/dataplex/catalog/aspect-types?project={project_id}"


class DataplexTaskLink(BaseGoogleLink):
Expand Down Expand Up @@ -199,3 +203,48 @@ def persist(
"project_id": task_instance.project_id,
},
)


class DataplexCatalogAspectTypeLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog AspectType link."""

name = "Dataplex Catalog AspectType"
key = "dataplex_catalog_aspect_type_key"
format_str = DATAPLEX_CATALOG_ASPECT_TYPE_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogAspectTypeLink.key,
value={
"aspect_type_id": task_instance.aspect_type_id,
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)


class DataplexCatalogAspectTypesLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog AspectTypes link."""

name = "Dataplex Catalog AspectTypes"
key = "dataplex_catalog_aspect_types_key"
format_str = DATAPLEX_CATALOG_ASPECT_TYPES_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogAspectTypesLink.key,
value={
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)
Loading

0 comments on commit 14604fc

Please sign in to comment.