From 4daf6c94db3fa12b677218e331cb2724e3a79d76 Mon Sep 17 00:00:00 2001 From: swathipil Date: Tue, 14 May 2024 15:16:07 -0700 Subject: [PATCH 1/5] add consumer group to uri used for consumer auth --- sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py | 2 +- sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py | 1 + sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py | 2 +- .../azure-eventhub/azure/eventhub/aio/_consumer_client_async.py | 1 + .../azure-eventhub/azure/eventhub/aio/_producer_client_async.py | 1 + 5 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py index 3a64c9676c1c..325b0ccc3c0f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py @@ -329,7 +329,7 @@ def __init__( else: self._credential = credential # type: ignore self._auto_reconnect = kwargs.get("auto_reconnect", True) - self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" + self._auth_uri: str self._config = Configuration( amqp_transport=self._amqp_transport, hostname=self._address.hostname, diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index b3b376bc6cff..c160cbf9ae98 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -178,6 +178,7 @@ def __init__( network_tracing=network_tracing, **kwargs ) + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = threading.Lock() self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py index 288cedc73f08..320bea5c601f 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py @@ -201,7 +201,7 @@ def __init__( network_tracing=kwargs.get("logging_enable"), **kwargs ) - + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._keep_alive = kwargs.get("keep_alive", None) self._producers: Dict[str, Optional[EventHubProducer]] = { diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 37cca02c01a1..7a845f407953 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -191,6 +191,7 @@ def __init__( network_tracing=network_tracing, **kwargs, ) + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = asyncio.Lock(**self._internal_kwargs) self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py index 1c95dfd97b95..bdac602669dd 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_producer_client_async.py @@ -188,6 +188,7 @@ def __init__( network_tracing=kwargs.pop("logging_enable", False), **kwargs ) + self._auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._keep_alive = kwargs.get("keep_alive", None) self._producers: Dict[str, Optional[EventHubProducer]] = { ALL_PARTITIONS: self._create_producer() From 4dca6405ce67448f4ee2f4f88869dd6a25cbf1c8 Mon Sep 17 00:00:00 2001 From: swathipil Date: Tue, 14 May 2024 15:21:18 -0700 Subject: [PATCH 2/5] changelog --- sdk/eventhub/azure-eventhub/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/eventhub/azure-eventhub/CHANGELOG.md b/sdk/eventhub/azure-eventhub/CHANGELOG.md index f942dbcaace1..d5736d615bad 100644 --- a/sdk/eventhub/azure-eventhub/CHANGELOG.md +++ b/sdk/eventhub/azure-eventhub/CHANGELOG.md @@ -8,6 +8,8 @@ ### Bugs Fixed +- Fixed a bug where the correct URI was not being used for consumer authentication, causing issues when assigning roles at the consumer group level. ([#35337](https://github.com/Azure/azure-sdk-for-python/issues/35337)) + ### Other Changes ## 5.11.7 (2024-04-10) From 1ce56a2bc2333db44517cdcd53263ad38056a1ab Mon Sep 17 00:00:00 2001 From: swathipil Date: Wed, 5 Jun 2024 17:53:48 -0700 Subject: [PATCH 3/5] add consumergroup to auth uri for consumer ops only, not mgmt ops --- .../azure/eventhub/_client_base.py | 14 ++++++---- .../azure/eventhub/_consumer.py | 2 +- .../azure/eventhub/_consumer_client.py | 1 + .../azure/eventhub/aio/_client_base_async.py | 22 ++++++++-------- .../eventhub/aio/_consumer_client_async.py | 1 + ...send_and_receive_amqp_annotated_message.py | 26 +++++++++++++------ 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py index 325b0ccc3c0f..e2fbb2753b05 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py @@ -330,6 +330,7 @@ def __init__( self._credential = credential # type: ignore self._auto_reconnect = kwargs.get("auto_reconnect", True) self._auth_uri: str + self._eventhub_auth_uri = f"sb://{self._address.hostname}{self._address.path}" self._config = Configuration( amqp_transport=self._amqp_transport, hostname=self._address.hostname, @@ -359,7 +360,7 @@ def _from_connection_string(conn_str: str, **kwargs: Any) -> Dict[str, Any]: kwargs["credential"] = EventHubSharedKeyCredential(policy, key) return kwargs - def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: + def _create_auth(self, *, auth_uri: Optional[str] = None) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -367,6 +368,9 @@ def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: :return: The auth for the session. :rtype: JWTTokenAuth or uamqp_JWTTokenAuth """ + # if auth_uri is not provided, use the default hub one + entity_auth_uri = auth_uri if auth_uri else self._eventhub_auth_uri + try: # ignore mypy's warning because token_type is Optional token_type = self._credential.token_type # type: ignore @@ -374,14 +378,14 @@ def _create_auth(self) -> Union["uamqp_JWTTokenAuth", JWTTokenAuth]: token_type = b"jwt" if token_type == b"servicebus.windows.net:sastoken": return self._amqp_transport.create_token_auth( - self._auth_uri, - functools.partial(self._credential.get_token, self._auth_uri), + entity_auth_uri, + functools.partial(self._credential.get_token, entity_auth_uri), token_type=token_type, config=self._config, update_token=True, ) return self._amqp_transport.create_token_auth( - self._auth_uri, + entity_auth_uri, functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE), token_type=token_type, config=self._config, @@ -574,7 +578,7 @@ def _open(self) -> bool: if not self.running: if self._handler: self._handler.close() - auth = self._client._create_auth() + auth = self._client._create_auth(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = self._client._conn_manager.get_connection( # pylint: disable=protected-access endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py index 56cedbdd83f2..49a992caab56 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py @@ -200,7 +200,7 @@ def _open(self) -> bool: if not self.running: if self._handler: self._handler.close() - auth = self._client._create_auth() + auth = self._client._create_auth(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = self._client._conn_manager.get_connection( # pylint: disable=protected-access endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py index c160cbf9ae98..266864daadd3 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py @@ -178,6 +178,7 @@ def __init__( network_tracing=network_tracing, **kwargs ) + # consumer auth URI additionally includes consumer group self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = threading.Lock() self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py index 4ee5d75db77b..568d68d687a7 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py @@ -46,16 +46,14 @@ from .._pyamqp.aio._authentication_async import JWTTokenAuthAsync try: from uamqp import ( - authentication as uamqp_authentication, Message as uamqp_Message, AMQPClientAsync as uamqp_AMQPClientAsync, ) - from uamqp.authentication import JWTTokenAsync + from uamqp.authentication import JWTTokenAsync as uamqp_JWTTokenAsync except ImportError: - uamqp_authentication = None uamqp_Message = None uamqp_AMQPClientAsync = None - JWTTokenAsync = None + uamqp_JWTTokenAsync = None from azure.core.credentials_async import AsyncTokenCredential try: @@ -109,7 +107,7 @@ def running(self) -> bool: def running(self, value: bool) -> None: pass - def _create_handler(self, auth: Union["JWTTokenAsync", JWTTokenAuthAsync]) -> None: + def _create_handler(self, auth: Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]) -> None: pass _MIXIN_BASE = AbstractConsumerProducer @@ -268,15 +266,17 @@ def _from_connection_string(conn_str: str, **kwargs) -> Dict[str, Any]: kwargs["credential"] = EventHubSharedKeyCredential(policy, key) return kwargs - async def _create_auth_async(self) -> Union[uamqp_authentication.JWTTokenAsync, JWTTokenAuthAsync]: + async def _create_auth_async(self, *, auth_uri: Optional[str] = None) -> Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]: """ Create an ~uamqp.authentication.SASTokenAuthAsync instance to authenticate the session. :return: A JWTTokenAuthAsync instance to authenticate the session. :rtype: ~uamqp.authentication.JWTTokenAsync or JWTTokenAuthAsync - """ + # if auth_uri is not provided, use the default hub one + entity_auth_uri = auth_uri if auth_uri else self._eventhub_auth_uri + try: # ignore mypy's warning because token_type is Optional token_type = self._credential.token_type # type: ignore @@ -284,14 +284,14 @@ async def _create_auth_async(self) -> Union[uamqp_authentication.JWTTokenAsync, token_type = b"jwt" if token_type == b"servicebus.windows.net:sastoken": return await self._amqp_transport.create_token_auth_async( - self._auth_uri, - functools.partial(self._credential.get_token, self._auth_uri), + entity_auth_uri, + functools.partial(self._credential.get_token, entity_auth_uri), token_type=token_type, config=self._config, update_token=True, ) return await self._amqp_transport.create_token_auth_async( - self._auth_uri, + entity_auth_uri, functools.partial(self._credential.get_token, JWT_TOKEN_SCOPE), token_type=token_type, config=self._config, @@ -475,7 +475,7 @@ async def _open(self) -> None: if not self.running: if self._handler: await self._handler.close_async() - auth = await self._client._create_auth_async() + auth = await self._client._create_auth_async(auth_uri=self._client._auth_uri) self._create_handler(auth) conn = await self._client._conn_manager_async.get_connection( endpoint=self._client._address.hostname, auth=auth diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py index 7a845f407953..7834539bbf51 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_consumer_client_async.py @@ -191,6 +191,7 @@ def __init__( network_tracing=network_tracing, **kwargs, ) + # consumer auth URI additionally includes consumer group self._auth_uri = f"sb://{self._address.hostname}{self._address.path}/consumergroups/{self._consumer_group}" self._lock = asyncio.Lock(**self._internal_kwargs) self._event_processors: Dict[Tuple[str, str], EventProcessor] = {} diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py index 3ce87b62ece2..be5e495699ee 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py @@ -17,6 +17,16 @@ CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] +import logging +import sys +handler = logging.StreamHandler(stream=sys.stdout) +logger = logging.getLogger('azure.eventhub') +logger.setLevel(logging.DEBUG) +logger.addHandler(handler) +#uamqp_logger = logging.getLogger('uamqp') +#uamqp_logger.setLevel(logging.DEBUG) +#uamqp_logger.addHandler(handler) + def send_data_message(producer): data_body = [b'aa', b'bb', b'cc'] application_properties = {"body_type": "data"} @@ -91,14 +101,14 @@ def receive_and_parse_message(consumer): print('Stopped receiving.') -producer = EventHubProducerClient.from_connection_string( - conn_str=CONNECTION_STR, - eventhub_name=EVENTHUB_NAME -) -with producer: - send_data_message(producer) - send_sequence_message(producer) - send_value_message(producer) +#producer = EventHubProducerClient.from_connection_string( +# conn_str=CONNECTION_STR, +# eventhub_name=EVENTHUB_NAME +#) +#with producer: +# send_data_message(producer) +# send_sequence_message(producer) +# send_value_message(producer) consumer = EventHubConsumerClient.from_connection_string( From 9f47b7ad7802fa635ec28756d655dc9fba7f5e5e Mon Sep 17 00:00:00 2001 From: swathipil Date: Thu, 6 Jun 2024 09:48:28 -0700 Subject: [PATCH 4/5] lint --- .../azure-eventhub/azure/eventhub/aio/_client_base_async.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py index 568d68d687a7..2410ac688b44 100644 --- a/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py +++ b/sdk/eventhub/azure-eventhub/azure/eventhub/aio/_client_base_async.py @@ -266,7 +266,9 @@ def _from_connection_string(conn_str: str, **kwargs) -> Dict[str, Any]: kwargs["credential"] = EventHubSharedKeyCredential(policy, key) return kwargs - async def _create_auth_async(self, *, auth_uri: Optional[str] = None) -> Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]: + async def _create_auth_async( + self, *, auth_uri: Optional[str] = None + ) -> Union["uamqp_JWTTokenAsync", JWTTokenAuthAsync]: """ Create an ~uamqp.authentication.SASTokenAuthAsync instance to authenticate the session. From ea7dae568400c522f393658530d62fe34e76cc55 Mon Sep 17 00:00:00 2001 From: swathipil Date: Thu, 6 Jun 2024 10:58:49 -0700 Subject: [PATCH 5/5] fix sample --- ...send_and_receive_amqp_annotated_message.py | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py b/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py index be5e495699ee..3ce87b62ece2 100644 --- a/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py +++ b/sdk/eventhub/azure-eventhub/samples/sync_samples/send_and_receive_amqp_annotated_message.py @@ -17,16 +17,6 @@ CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] -import logging -import sys -handler = logging.StreamHandler(stream=sys.stdout) -logger = logging.getLogger('azure.eventhub') -logger.setLevel(logging.DEBUG) -logger.addHandler(handler) -#uamqp_logger = logging.getLogger('uamqp') -#uamqp_logger.setLevel(logging.DEBUG) -#uamqp_logger.addHandler(handler) - def send_data_message(producer): data_body = [b'aa', b'bb', b'cc'] application_properties = {"body_type": "data"} @@ -101,14 +91,14 @@ def receive_and_parse_message(consumer): print('Stopped receiving.') -#producer = EventHubProducerClient.from_connection_string( -# conn_str=CONNECTION_STR, -# eventhub_name=EVENTHUB_NAME -#) -#with producer: -# send_data_message(producer) -# send_sequence_message(producer) -# send_value_message(producer) +producer = EventHubProducerClient.from_connection_string( + conn_str=CONNECTION_STR, + eventhub_name=EVENTHUB_NAME +) +with producer: + send_data_message(producer) + send_sequence_message(producer) + send_value_message(producer) consumer = EventHubConsumerClient.from_connection_string(