diff --git a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py index 504dbe16871e4..4a153f0c82624 100644 --- a/providers/src/airflow/providers/microsoft/azure/hooks/asb.py +++ b/providers/src/airflow/providers/microsoft/azure/hooks/asb.py @@ -28,6 +28,7 @@ ServiceBusSender, ) from azure.servicebus.management import ( + AuthorizationRule, CorrelationRuleFilter, QueueProperties, ServiceBusAdministrationClient, @@ -194,6 +195,93 @@ def delete_queue(self, queue_name: str) -> None: with self.get_conn() as service_mgmt_conn: service_mgmt_conn.delete_queue(queue_name) + def create_topic( + self, + topic_name: str, + azure_service_bus_conn_id: str = "azure_service_bus_default", + default_message_time_to_live: datetime.timedelta | str | None = None, + max_size_in_megabytes: int | None = None, + requires_duplicate_detection: bool | None = None, + duplicate_detection_history_time_window: datetime.timedelta | str | None = None, + enable_batched_operations: bool | None = None, + size_in_bytes: int | None = None, + filtering_messages_before_publishing: bool | None = None, + authorization_rules: list[AuthorizationRule] | None = None, + support_ordering: bool | None = None, + auto_delete_on_idle: datetime.timedelta | str | None = None, + enable_partitioning: bool | None = None, + enable_express: bool | None = None, + user_metadata: str | None = None, + max_message_size_in_kilobytes: int | None = None, + ) -> str: + """ + Create a topic by connecting to service Bus Admin client. + + :param topic_name: Name of the topic. + :param default_message_time_to_live: ISO 8601 default message time span to live value. This is + the duration after which the message expires, starting from when the message is sent to Service + Bus. This is the default value used when TimeToLive is not set on a message itself. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of + memory allocated for the topic. + :param requires_duplicate_detection: A value indicating if this topic requires duplicate + detection. + :param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the + duration of the duplicate detection history. The default value is 10 minutes. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param enable_batched_operations: Value that indicates whether server-side batched operations + are enabled. + :param size_in_bytes: The size of the topic, in bytes. + :param filtering_messages_before_publishing: Filter messages before publishing. + :param authorization_rules: List of Authorization rules for resource. + :param support_ordering: A value that indicates whether the topic supports ordering. + :param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is + automatically deleted. The minimum duration is 5 minutes. + Input value of either type ~datetime.timedelta or string in ISO 8601 duration format + like "PT300S" is accepted. + :param enable_partitioning: A value that indicates whether the topic is to be partitioned + across multiple message brokers. + :param enable_express: A value that indicates whether Express Entities are enabled. An express + queue holds a message in memory temporarily before writing it to persistent storage. + :param user_metadata: Metadata associated with the topic. + :param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that + can be accepted by the queue. This feature is only available when using a Premium namespace + and Service Bus API version "2021-05" or higher. + The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024. + """ + if topic_name is None: + raise TypeError("Topic name cannot be None.") + + with self.get_conn() as service_mgmt_conn: + try: + topic_properties = service_mgmt_conn.get_topic(topic_name) + except ResourceNotFoundError: + topic_properties = None + if topic_properties and topic_properties.name == topic_name: + self.log.info("Topic name already exists") + return topic_properties.name + topic = service_mgmt_conn.create_topic( + topic_name=topic_name, + default_message_time_to_live=default_message_time_to_live, + max_size_in_megabytes=max_size_in_megabytes, + requires_duplicate_detection=requires_duplicate_detection, + duplicate_detection_history_time_window=duplicate_detection_history_time_window, + enable_batched_operations=enable_batched_operations, + size_in_bytes=size_in_bytes, + filtering_messages_before_publishing=filtering_messages_before_publishing, + authorization_rules=authorization_rules, + support_ordering=support_ordering, + auto_delete_on_idle=auto_delete_on_idle, + enable_partitioning=enable_partitioning, + enable_express=enable_express, + user_metadata=user_metadata, + max_message_size_in_kilobytes=max_message_size_in_kilobytes, + ) + self.log.info("Created Topic %s", topic.name) + return topic.name + def create_subscription( self, topic_name: str, diff --git a/providers/src/airflow/providers/microsoft/azure/operators/asb.py b/providers/src/airflow/providers/microsoft/azure/operators/asb.py index 1c16a8115d7b4..ba3a3257b940d 100644 --- a/providers/src/airflow/providers/microsoft/azure/operators/asb.py +++ b/providers/src/airflow/providers/microsoft/azure/operators/asb.py @@ -19,8 +19,6 @@ from collections.abc import Sequence from typing import TYPE_CHECKING, Any, Callable -from azure.core.exceptions import ResourceNotFoundError - from airflow.models import BaseOperator from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook @@ -313,33 +311,23 @@ def execute(self, context: Context) -> str: # Create the hook hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id) - with hook.get_conn() as service_mgmt_conn: - try: - topic_properties = service_mgmt_conn.get_topic(self.topic_name) - except ResourceNotFoundError: - topic_properties = None - if topic_properties and topic_properties.name == self.topic_name: - self.log.info("Topic name already exists") - return topic_properties.name - topic = service_mgmt_conn.create_topic( - topic_name=self.topic_name, - default_message_time_to_live=self.default_message_time_to_live, - max_size_in_megabytes=self.max_size_in_megabytes, - requires_duplicate_detection=self.requires_duplicate_detection, - duplicate_detection_history_time_window=self.duplicate_detection_history_time_window, - enable_batched_operations=self.enable_batched_operations, - size_in_bytes=self.size_in_bytes, - filtering_messages_before_publishing=self.filtering_messages_before_publishing, - authorization_rules=self.authorization_rules, - support_ordering=self.support_ordering, - auto_delete_on_idle=self.auto_delete_on_idle, - enable_partitioning=self.enable_partitioning, - enable_express=self.enable_express, - user_metadata=self.user_metadata, - max_message_size_in_kilobytes=self.max_message_size_in_kilobytes, - ) - self.log.info("Created Topic %s", topic.name) - return topic.name + return hook.create_topic( + topic_name=self.topic_name, + default_message_time_to_live=self.default_message_time_to_live, + max_size_in_megabytes=self.max_size_in_megabytes, + requires_duplicate_detection=self.requires_duplicate_detection, + duplicate_detection_history_time_window=self.duplicate_detection_history_time_window, + enable_batched_operations=self.enable_batched_operations, + size_in_bytes=self.size_in_bytes, + filtering_messages_before_publishing=self.filtering_messages_before_publishing, + authorization_rules=self.authorization_rules, + support_ordering=self.support_ordering, + auto_delete_on_idle=self.auto_delete_on_idle, + enable_partitioning=self.enable_partitioning, + enable_express=self.enable_express, + user_metadata=self.user_metadata, + max_message_size_in_kilobytes=self.max_message_size_in_kilobytes, + ) class AzureServiceBusSubscriptionCreateOperator(BaseOperator): diff --git a/providers/tests/microsoft/azure/hooks/test_asb.py b/providers/tests/microsoft/azure/hooks/test_asb.py index fc5e433058c72..067e79bf5702d 100644 --- a/providers/tests/microsoft/azure/hooks/test_asb.py +++ b/providers/tests/microsoft/azure/hooks/test_asb.py @@ -119,6 +119,26 @@ def test_delete_queue_exception(self, mock_sb_admin_client): with pytest.raises(TypeError): hook.delete_queue(None) + # Test creating a topic using hook method `create_topic` + @mock.patch("azure.servicebus.management.TopicProperties") + @mock.patch(f"{MODULE}.AdminClientHook.get_conn") + def test_create_topic(self, mock_sb_admin_client, mock_topic_properties): + """ + Test `create_topic` hook function with mocking connection, topic properties value and + the azure service bus `create_topic` function + """ + topic_name = "test_topic_name" + mock_topic_properties.name = topic_name + mock_sb_admin_client.return_value.__enter__.return_value.create_topic.return_value = ( + mock_topic_properties + ) + hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id) + with mock.patch.object(hook.log, "info") as mock_log_info: + hook.create_topic(topic_name) + assert mock_topic_properties.name == topic_name + + mock_log_info.assert_called_with("Created Topic %s", topic_name) + # Test creating subscription with topic name and subscription name using hook method `create_subscription` @mock.patch("azure.servicebus.management.SubscriptionProperties") @mock.patch(f"{MODULE}.AdminClientHook.get_conn") diff --git a/providers/tests/microsoft/azure/operators/test_asb.py b/providers/tests/microsoft/azure/operators/test_asb.py index 7e0c953890c22..145d2e729407a 100644 --- a/providers/tests/microsoft/azure/operators/test_asb.py +++ b/providers/tests/microsoft/azure/operators/test_asb.py @@ -255,19 +255,39 @@ def test_init(self): @mock.patch("azure.servicebus.management.TopicProperties") def test_create_topic(self, mock_topic_properties, mock_get_conn): """ - Test AzureServiceBusTopicCreateOperator passed with the topic name - mocking the connection details, hook create_topic function + Test AzureServiceBusSubscriptionCreateOperator passed with the subscription name, topic name + mocking the connection details, hook create_subscription function """ + print("Wazzup doc") asb_create_topic = AzureServiceBusTopicCreateOperator( task_id="asb_create_topic", topic_name=TOPIC_NAME, ) mock_topic_properties.name = TOPIC_NAME mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = mock_topic_properties - - with mock.patch.object(asb_create_topic.log, "info") as mock_log_info: - asb_create_topic.execute(None) - mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME) + # create the topic + created_topic_name = asb_create_topic.execute(None) + # ensure the topic name is returned + assert created_topic_name == TOPIC_NAME + # ensure create_subscription is called with the correct arguments on the connection + mock_get_conn.return_value.__enter__.return_value.create_topic.assert_called_once_with( + topic_name=TOPIC_NAME, + default_message_time_to_live=None, + max_size_in_megabytes=None, + requires_duplicate_detection=None, + duplicate_detection_history_time_window=None, + enable_batched_operations=None, + size_in_bytes=None, + filtering_messages_before_publishing=None, + authorization_rules=None, + support_ordering=None, + auto_delete_on_idle=None, + enable_partitioning=None, + enable_express=None, + user_metadata=None, + max_message_size_in_kilobytes=None, + ) + print("Later Gator") @mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook") def test_create_subscription_exception(self, mock_sb_admin_client):