Skip to content

Commit

Permalink
Add new operators for Entry Type (apache#45799)
Browse files Browse the repository at this point in the history
Co-authored-by: Ulada Zakharava <[email protected]>
  • Loading branch information
2 people authored and niklasr22 committed Feb 8, 2025
1 parent a45ffc0 commit f4825b7
Show file tree
Hide file tree
Showing 10 changed files with 1,131 additions and 4 deletions.
84 changes: 84 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,87 @@ use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUp
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_group]
:end-before: [END howto_operator_dataplex_catalog_update_entry_group]

.. _howto/operator:DataplexCatalogCreateEntryTypeOperator:

Create an EntryType
--------------------

To create an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`
For more information about the available fields to pass when creating an Entry Type, visit `Entry Type resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryTypes#EntryType>`__

A simple Entry Group configuration can look as followed:

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_entry_type_configuration]
:end-before: [END howto_dataplex_entry_type_configuration]

With this configuration you can create an Entry Type resource:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_create_entry_type]
:end-before: [END howto_operator_dataplex_catalog_create_entry_type]

.. _howto/operator:DataplexCatalogDeleteEntryTypeOperator:

Delete an EntryType
--------------------

To delete an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_delete_entry_type]
:end-before: [END howto_operator_dataplex_catalog_delete_entry_type]

.. _howto/operator:DataplexCatalogListEntryTypesOperator:

List EntryTypes
----------------

To list all Entry Types in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryTypesOperator`.
This operator also supports filtering and ordering the result of the operation.

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_list_entry_types]
:end-before: [END howto_operator_dataplex_catalog_list_entry_types]

.. _howto/operator:DataplexCatalogGetEntryTypeOperator:

Get an EntryType
-----------------

To retrieve an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_get_entry_type]
:end-before: [END howto_operator_dataplex_catalog_get_entry_type]

.. _howto/operator:DataplexCatalogUpdateEntryTypeOperator:

Update an EntryType
--------------------

To update an Entry Type in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryTypeOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_type]
:end-before: [END howto_operator_dataplex_catalog_update_entry_type]
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,8 @@ EntryGroup
EntryGroups
entrypoint
entrypoints
EntryType
EntryTypes
Enum
enum
enums
Expand Down
200 changes: 199 additions & 1 deletion providers/src/airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
DataScan,
DataScanJob,
EntryGroup,
EntryType,
Lake,
Task,
Zone,
Expand All @@ -54,7 +55,10 @@
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager
from google.cloud.dataplex_v1.services.catalog_service.pagers import (
ListEntryGroupsPager,
ListEntryTypesPager,
)
from googleapiclient.discovery import Resource

PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
Expand Down Expand Up @@ -134,6 +138,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
error = operation.exception(timeout=timeout)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_type(
self,
location: str,
entry_type_id: str,
entry_type_configuration: EntryType | dict,
project_id: str = PROVIDE_PROJECT_ID,
validate_only: bool = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Create an EntryType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryType identifier.
:param entry_type_configuration: Required. EntryType configuration body.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. If set, performs request validation, but does not actually execute
the create request.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.create_entry_type(
request={
"parent": client.common_location_path(project_id, location),
"entry_type_id": entry_type_id,
"entry_type": entry_type_configuration,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def get_entry_type(
self,
location: str,
entry_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> EntryType:
"""
Get an EntryType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.get_entry_type(
request={
"name": client.entry_type_path(project_id, location, entry_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_type(
self,
location: str,
entry_type_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Delete an EntryType resource.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_type_id: Required. EntryType identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.delete_entry_type(
request={
"name": client.entry_type_path(project_id, location, entry_type_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def list_entry_types(
self,
location: str,
filter_by: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListEntryTypesPager:
"""
List EntryTypes resources from specific location.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.list_entry_types(
request={
"parent": client.common_location_path(project_id, location),
"filter": filter_by,
"order_by": order_by,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def update_entry_type(
self,
location: str,
entry_type_id: str,
entry_type_configuration: dict | EntryType,
project_id: str = PROVIDE_PROJECT_ID,
update_mask: list[str] | FieldMask | None = None,
validate_only: bool | None = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Update an EntryType resource.
:param entry_type_id: Required. ID of the EntryType to update.
:param entry_type_configuration: Required. The updated configuration body of the EntryType.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
If this parameter is absent or empty, all modifiable fields are overwritten. If such
fields are non-required and omitted in the request body, their values are emptied.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. The service validates the request without performing any mutations.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
_entry_type = (
deepcopy(entry_type_configuration)
if isinstance(entry_type_configuration, dict)
else EntryType.to_dict(entry_type_configuration)
)
_entry_type["name"] = client.entry_type_path(project_id, location, entry_type_id)
return client.update_entry_type(
request={
"entry_type": _entry_type,
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_group(
self,
Expand Down
49 changes: 49 additions & 0 deletions providers/src/airflow/providers/google/cloud/links/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
DATAPLEX_CATALOG_ENTRY_GROUP_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/entryGroups/{entry_group_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPE_LINK = (
"/dataplex/projects/{project_id}/locations/{location}/entryTypes/{entry_type_id}?project={project_id}"
)
DATAPLEX_CATALOG_ENTRY_TYPES_LINK = "/dataplex/catalog/entry-types?project={project_id}"


class DataplexTaskLink(BaseGoogleLink):
Expand Down Expand Up @@ -150,3 +154,48 @@ def persist(
"project_id": task_instance.project_id,
},
)


class DataplexCatalogEntryTypeLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog EntryType link."""

name = "Dataplex Catalog EntryType"
key = "dataplex_catalog_entry_type_key"
format_str = DATAPLEX_CATALOG_ENTRY_TYPE_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogEntryTypeLink.key,
value={
"entry_type_id": task_instance.entry_type_id,
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)


class DataplexCatalogEntryTypesLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Catalog EntryTypes link."""

name = "Dataplex Catalog EntryTypes"
key = "dataplex_catalog_entry_types_key"
format_str = DATAPLEX_CATALOG_ENTRY_TYPES_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexCatalogEntryTypesLink.key,
value={
"location": task_instance.location,
"project_id": task_instance.project_id,
},
)
Loading

0 comments on commit f4825b7

Please sign in to comment.