Skip to content

Commit

Permalink
Create operators for working with Topics for GCP Apache Kafka (#46865)
Browse files Browse the repository at this point in the history
  • Loading branch information
MaksYermak authored Feb 20, 2025
1 parent 3f548a0 commit b99cb7c
Show file tree
Hide file tree
Showing 10 changed files with 1,240 additions and 2 deletions.
48 changes: 48 additions & 0 deletions providers/google/docs/operators/cloud/managed_kafka.rst
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ To update cluster you can use
:start-after: [START how_to_cloud_managed_kafka_update_cluster_operator]
:end-before: [END how_to_cloud_managed_kafka_update_cluster_operator]

Interacting with Apache Kafka Topics
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To create an Apache Kafka topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaCreateTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_create_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_create_topic_operator]

To delete topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaDeleteTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_delete_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_delete_topic_operator]

To get topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaGetTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_get_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_get_topic_operator]

To get a list of topics you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaListTopicsOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_list_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_list_topic_operator]

To update topic you can use
:class:`~airflow.providers.google.cloud.operators.managed_kafka.ManagedKafkaUpdateTopicOperator`.

.. exampleinclude:: /../../providers/google/tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_managed_kafka_update_topic_operator]
:end-before: [END how_to_cloud_managed_kafka_update_topic_operator]

Reference
^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,7 @@ extra-links:
- airflow.providers.google.cloud.links.translate.TranslationGlossariesListLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaClusterListLink
- airflow.providers.google.cloud.links.managed_kafka.ApacheKafkaTopicLink


secrets-backends:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.managedkafka_v1 import Cluster, ManagedKafkaClient, types
from google.cloud.managedkafka_v1 import Cluster, ManagedKafkaClient, Topic, types

if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.cloud.managedkafka_v1.services.managed_kafka.pagers import ListClustersPager
from google.cloud.managedkafka_v1.services.managed_kafka.pagers import ListClustersPager, ListTopicsPager
from google.protobuf.field_mask_pb2 import FieldMask


Expand Down Expand Up @@ -286,3 +286,197 @@ def delete_cluster(
metadata=metadata,
)
return operation

@GoogleBaseHook.fallback_to_default_project_id
def create_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
topic: types.Topic | dict,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Create a new topic in a given project and location.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster in which to create the topic.
:param topic_id: Required. The ID to use for the topic, which will become the final component of the
topic's name.
:param topic: Required. Configuration of the topic to create.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
parent = client.cluster_path(project_id, location, cluster_id)

result = client.create_topic(
request={
"parent": parent,
"topic_id": topic_id,
"topic": topic,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def list_topics(
self,
project_id: str,
location: str,
cluster_id: str,
page_size: int | None = None,
page_token: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListTopicsPager:
"""
List the topics in a given cluster.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topics are to be listed.
:param page_size: Optional. The maximum number of topics to return. The service may return fewer than
this value. If unset or zero, all topics for the parent is returned.
:param page_token: Optional. A page token, received from a previous ``ListTopics`` call. Provide this
to retrieve the subsequent page. When paginating, all other parameters provided to ``ListTopics``
must match the call that provided the page token.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
parent = client.cluster_path(project_id, location, cluster_id)

result = client.list_topics(
request={
"parent": parent,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def get_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Return the properties of a single topic.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be returned.
:param topic_id: Required. The ID of the topic whose configuration to return.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
name = client.topic_path(project_id, location, cluster_id, topic_id)

result = client.get_topic(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def update_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
topic: types.Topic | dict,
update_mask: FieldMask | dict,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> types.Topic:
"""
Update the properties of a single topic.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be updated.
:param topic_id: Required. The ID of the topic whose configuration to update.
:param topic: Required. The topic to update. Its ``name`` field must be populated.
:param update_mask: Required. Field mask is used to specify the fields to be overwritten in the Topic
resource by the update. The fields specified in the update_mask are relative to the resource, not
the full request. A field will be overwritten if it is in the mask.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
_topic = deepcopy(topic) if isinstance(topic, dict) else Topic.to_dict(topic)
_topic["name"] = client.topic_path(project_id, location, cluster_id, topic_id)

result = client.update_topic(
request={
"update_mask": update_mask,
"topic": _topic,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return result

@GoogleBaseHook.fallback_to_default_project_id
def delete_topic(
self,
project_id: str,
location: str,
cluster_id: str,
topic_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> None:
"""
Delete a single topic.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param cluster_id: Required. The ID of the cluster whose topic is to be deleted.
:param topic_id: Required. The ID of the topic to delete.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = self.get_managed_kafka_client()
name = client.topic_path(project_id, location, cluster_id, topic_id)

client.delete_topic(
request={
"name": name,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
MANAGED_KAFKA_BASE_LINK + "/{location}/clusters/{cluster_id}?project={project_id}"
)
MANAGED_KAFKA_CLUSTER_LIST_LINK = MANAGED_KAFKA_BASE_LINK + "/clusters?project={project_id}"
MANAGED_KAFKA_TOPIC_LINK = (
MANAGED_KAFKA_BASE_LINK + "/{location}/clusters/{cluster_id}/topics/{topic_id}?project={project_id}"
)


class ApacheKafkaClusterLink(BaseGoogleLink):
Expand Down Expand Up @@ -73,3 +76,29 @@ def persist(
"project_id": task_instance.project_id,
},
)


class ApacheKafkaTopicLink(BaseGoogleLink):
"""Helper class for constructing Apache Kafka Topic link."""

name = "Apache Kafka Topic"
key = "topic_conf"
format_str = MANAGED_KAFKA_TOPIC_LINK

@staticmethod
def persist(
context: Context,
task_instance,
cluster_id: str,
topic_id: str,
):
task_instance.xcom_push(
context=context,
key=ApacheKafkaTopicLink.key,
value={
"location": task_instance.location,
"cluster_id": cluster_id,
"topic_id": topic_id,
"project_id": task_instance.project_id,
},
)
Loading

0 comments on commit b99cb7c

Please sign in to comment.