Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration of System Tests: Dataplex (AIP-47) #26989

Merged
merged 3 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.dataplex_v1 import DataplexServiceClient
from google.cloud.dataplex_v1.types import Task
from google.cloud.dataplex_v1.types import Lake, Task
from googleapiclient.discovery import Resource

from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook


Expand Down Expand Up @@ -70,7 +71,7 @@ def get_dataplex_client(self) -> DataplexServiceClient:
client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")

return DataplexServiceClient(
credentials=self.get_credentials(), client_info=self.client_info, client_options=client_options
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)

def wait_for_operation(self, timeout: float | None, operation: Operation):
Expand Down Expand Up @@ -248,3 +249,113 @@ def get_task(
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def delete_lake(
self,
project_id: str,
region: str,
lake_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
"""
Delete the lake resource.

:param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
:param region: Required. The ID of the Google Cloud region that the lake belongs to.
:param lake_id: Required. The ID of the Google Cloud lake to be deleted.
:param retry: A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Additional metadata that is provided to the method.
"""
name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}"

client = self.get_dataplex_client()
result = client.delete_lake(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def create_lake(
self,
project_id: str,
region: str,
lake_id: str,
body: dict[str, Any] | Lake,
validate_only: bool | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
"""
Creates a lake resource.

:param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
:param region: Required. The ID of the Google Cloud region that the lake belongs to.
:param lake_id: Required. Lake identifier.
:param body: Required. The Request body contains an instance of Lake.
:param validate_only: Optional. Only validate the request, but do not perform mutations.
The default is false.
:param retry: A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Additional metadata that is provided to the method.
"""
parent = f"projects/{project_id}/locations/{region}"
client = self.get_dataplex_client()
result = client.create_lake(
request={
"parent": parent,
"lake_id": lake_id,
"lake": body,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def get_lake(
self,
project_id: str,
region: str,
lake_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Any:
"""
Get lake resource.

:param project_id: Required. The ID of the Google Cloud project that the lake belongs to.
:param region: Required. The ID of the Google Cloud region that the lake belongs to.
:param lake_id: Required. The ID of the Google Cloud lake to be retrieved.
:param retry: A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Additional metadata that is provided to the method.
"""
name = f"projects/{project_id}/locations/{region}/lakes/{lake_id}/"
client = self.get_dataplex_client()
result = client.get_lake(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result
27 changes: 27 additions & 0 deletions airflow/providers/google/cloud/links/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
DATAPLEX_TASK_LINK = DATAPLEX_BASE_LINK + "/{lake_id}.{task_id};location={region}/jobs?project={project_id}"
DATAPLEX_TASKS_LINK = DATAPLEX_BASE_LINK + "?project={project_id}&qLake={lake_id}.{region}"

DATAPLEX_LAKE_LINK = (
"https://console.cloud.google.com/dataplex/lakes/{lake_id};location={region}?project={project_id}"
)


class DataplexTaskLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Task link"""
Expand Down Expand Up @@ -75,3 +79,26 @@ def persist(
"region": task_instance.region,
},
)


class DataplexLakeLink(BaseGoogleLink):
"""Helper class for constructing Dataplex Lake link"""

name = "Dataplex Lake"
key = "dataplex_lake_key"
format_str = DATAPLEX_LAKE_LINK

@staticmethod
def persist(
context: Context,
task_instance,
):
task_instance.xcom_push(
context=context,
key=DataplexLakeLink.key,
value={
"lake_id": task_instance.lake_id,
"region": task_instance.region,
"project_id": task_instance.project_id,
},
)
Loading