Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] add fixed backoff retry mode #21884

Merged
merged 10 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Features Added

- Added support for fixed (linear) retry backoff:
- Sync/async `EventHubProducerClient` and `EventHubConsumerClient` constructors and `from_connection_string` take `retry_mode` as a keyword argument.
- `RetryMode` enum has been added to `azure.eventhub`, with values `Fixed` and `Exponential`.

### Breaking Changes

### Bugs Fixed
Expand Down
4 changes: 3 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parse_connection_string,
EventHubConnectionStringProperties
)
from ._retry import RetryMode

TransportType = constants.TransportType

Expand All @@ -33,5 +34,6 @@
"LoadBalancingStrategy",
"PartitionContext",
"parse_connection_string",
"EventHubConnectionStringProperties"
"EventHubConnectionStringProperties",
"RetryMode"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're API changes, remember to get @annatisch to take a look at the API view before CC day :)

]
85 changes: 54 additions & 31 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_client_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@

from uamqp import AMQPClient, Message, authentication, constants, errors, compat, utils
import six
from azure.core.credentials import AccessToken, AzureSasCredential, AzureNamedKeyCredential
from azure.core.credentials import (
AccessToken,
AzureSasCredential,
AzureNamedKeyCredential,
)
from azure.core.utils import parse_connection_string as core_parse_connection_string


from .exceptions import _handle_exception, ClientClosedError, ConnectError
from ._configuration import Configuration
from ._retry import RetryMode
from ._utils import utc_from_timestamp, parse_sas_credential
from ._connection_manager import get_connection_manager
from ._constants import (
Expand All @@ -34,7 +39,7 @@
MGMT_OPERATION,
MGMT_PARTITION_OPERATION,
MGMT_STATUS_CODE,
MGMT_STATUS_DESC
MGMT_STATUS_DESC,
)

if TYPE_CHECKING:
Expand All @@ -52,9 +57,11 @@ def _parse_conn_str(conn_str, **kwargs):
entity_path = None # type: Optional[str]
shared_access_signature = None # type: Optional[str]
shared_access_signature_expiry = None
eventhub_name = kwargs.pop("eventhub_name", None) # type: Optional[str]
check_case = kwargs.pop("check_case", False) # type: bool
conn_settings = core_parse_connection_string(conn_str, case_sensitive_keys=check_case)
eventhub_name = kwargs.pop("eventhub_name", None) # type: Optional[str]
check_case = kwargs.pop("check_case", False) # type: bool
conn_settings = core_parse_connection_string(
conn_str, case_sensitive_keys=check_case
)
if check_case:
shared_access_key = conn_settings.get("SharedAccessKey")
shared_access_key_name = conn_settings.get("SharedAccessKeyName")
Expand All @@ -79,7 +86,7 @@ def _parse_conn_str(conn_str, **kwargs):
try:
# Expiry can be stored in the "se=<timestamp>" clause of the token. ('&'-separated key-value pairs)
shared_access_signature_expiry = int(
shared_access_signature.split("se=")[1].split("&")[0] # type: ignore
shared_access_signature.split("se=")[1].split("&")[0] # type: ignore
)
except (
IndexError,
Expand Down Expand Up @@ -117,12 +124,14 @@ def _parse_conn_str(conn_str, **kwargs):
"At least one of the SharedAccessKey or SharedAccessSignature must be present."
)

return (host,
str(shared_access_key_name) if shared_access_key_name else None,
str(shared_access_key) if shared_access_key else None,
entity,
str(shared_access_signature) if shared_access_signature else None,
shared_access_signature_expiry)
return (
host,
str(shared_access_key_name) if shared_access_key_name else None,
str(shared_access_key) if shared_access_key else None,
entity,
str(shared_access_signature) if shared_access_signature else None,
shared_access_signature_expiry,
)


def _generate_sas_token(uri, policy, key, expiry=None):
Expand Down Expand Up @@ -154,6 +163,14 @@ def _build_uri(address, entity):
return address


def _get_backoff_time(retry_mode, backoff_factor, backoff_max, retried_times):
if retry_mode == RetryMode.Fixed:
backoff_value = backoff_factor
else:
backoff_value = backoff_factor * (2 ** retried_times)
return min(backoff_max, backoff_value)


class EventHubSharedKeyCredential(object):
"""The shared access key credential used for authentication.

Expand Down Expand Up @@ -200,6 +217,7 @@ class EventHubSASTokenCredential(object):
:param str token: The shared access token string
:param int expiry: The epoch timestamp
"""

def __init__(self, token, expiry):
# type: (str, int) -> None
"""
Expand All @@ -225,6 +243,7 @@ class EventhubAzureSasTokenCredential(object):
:param azure_sas_credential: The credential to be used for authentication.
:type azure_sas_credential: ~azure.core.credentials.AzureSasCredential
"""

def __init__(self, azure_sas_credential):
# type: (AzureSasCredential) -> None
"""The shared access token credential used for authentication
Expand Down Expand Up @@ -257,9 +276,9 @@ def __init__(self, fully_qualified_namespace, eventhub_name, credential, **kwarg
if isinstance(credential, AzureSasCredential):
self._credential = EventhubAzureSasTokenCredential(credential)
elif isinstance(credential, AzureNamedKeyCredential):
self._credential = EventhubAzureNamedKeyTokenCredential(credential) # type: ignore
self._credential = EventhubAzureNamedKeyTokenCredential(credential) # type: ignore
else:
self._credential = credential #type: ignore
self._credential = credential # type: ignore
self._keep_alive = kwargs.get("keep_alive", 30)
self._auto_reconnect = kwargs.get("auto_reconnect", True)
self._mgmt_target = "amqps://{}/{}".format(
Expand All @@ -274,7 +293,9 @@ def __init__(self, fully_qualified_namespace, eventhub_name, credential, **kwarg
@staticmethod
def _from_connection_string(conn_str, **kwargs):
# type: (str, Any) -> Dict[str, Any]
host, policy, key, entity, token, token_expiry = _parse_conn_str(conn_str, **kwargs)
host, policy, key, entity, token, token_expiry = _parse_conn_str(
conn_str, **kwargs
)
kwargs["fully_qualified_namespace"] = host
kwargs["eventhub_name"] = entity
if token and token_expiry:
Expand All @@ -291,7 +312,7 @@ def _create_auth(self):
"""
try:
# ignore mypy's warning because token_type is Optional
token_type = self._credential.token_type # type: ignore
token_type = self._credential.token_type # type: ignore
except AttributeError:
token_type = b"jwt"
if token_type == b"servicebus.windows.net:sastoken":
Expand All @@ -305,7 +326,7 @@ def _create_auth(self):
transport_type=self._config.transport_type,
custom_endpoint_hostname=self._config.custom_endpoint_hostname,
port=self._config.connection_port,
verify=self._config.connection_verify
verify=self._config.connection_verify,
)
auth.update_token()
return auth
Expand All @@ -319,7 +340,7 @@ def _create_auth(self):
transport_type=self._config.transport_type,
custom_endpoint_hostname=self._config.custom_endpoint_hostname,
port=self._config.connection_port,
verify=self._config.connection_verify
verify=self._config.connection_verify,
)

def _close_connection(self):
Expand All @@ -331,7 +352,12 @@ def _backoff(
):
# type: (int, Exception, Optional[int], Optional[str]) -> None
entity_name = entity_name or self._container_id
backoff = self._config.backoff_factor * 2 ** retried_times
backoff = _get_backoff_time(
self._config.retry_mode,
self._config.backoff_factor,
self._config.backoff_max,
retried_times,
)
if backoff <= self._config.backoff_max and (
timeout_time is None or time.time() + backoff <= timeout_time
): # pylint:disable=no-else-return
Expand Down Expand Up @@ -360,7 +386,7 @@ def _management_request(self, mgmt_msg, op_type):
self._mgmt_target, auth=mgmt_auth, debug=self._config.network_tracing
)
try:
conn = self._conn_manager.get_connection(
conn = self._conn_manager.get_connection( # pylint:disable=assignment-from-none
self._address.hostname, mgmt_auth
)
mgmt_client.open(connection=conn)
Expand All @@ -373,29 +399,28 @@ def _management_request(self, mgmt_msg, op_type):
description_fields=MGMT_STATUS_DESC,
)
status_code = int(response.application_properties[MGMT_STATUS_CODE])
description = response.application_properties.get(MGMT_STATUS_DESC) # type: Optional[Union[str, bytes]]
description = response.application_properties.get(
MGMT_STATUS_DESC
) # type: Optional[Union[str, bytes]]
if description and isinstance(description, six.binary_type):
description = description.decode('utf-8')
description = description.decode("utf-8")
if status_code < 400:
return response
if status_code in [401]:
raise errors.AuthenticationException(
"Management authentication failed. Status code: {}, Description: {!r}".format(
status_code,
description
status_code, description
)
)
if status_code in [404]:
raise ConnectError(
"Management connection failed. Status code: {}, Description: {!r}".format(
status_code,
description
status_code, description
)
)
raise errors.AMQPConnectionError(
"Management request error. Status code: {}, Description: {!r}".format(
status_code,
description
status_code, description
)
)
except Exception as exception: # pylint: disable=broad-except
Expand Down Expand Up @@ -491,9 +516,7 @@ def _check_closed(self):
)

def _open(self):
"""Open the EventHubConsumer/EventHubProducer using the supplied connection.

"""
"""Open the EventHubConsumer/EventHubProducer using the supplied connection."""
# pylint: disable=protected-access
if not self.running:
if self._handler:
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def _from_message(cls, message, raw_amqp_message=None):
"""
event_data = cls(body="")
event_data.message = message
# pylint: disable=protected-access
event_data._raw_amqp_message = raw_amqp_message if raw_amqp_message else AmqpAnnotatedMessage(message=message)
return event_data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from urllib.parse import urlparse

from uamqp.constants import TransportType, DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT
from ._retry import RetryMode


class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
self.user_agent = kwargs.get("user_agent") # type: Optional[str]
self.retry_total = kwargs.get("retry_total", 3) # type: int
self.max_retries = self.retry_total # type: int
self.retry_mode = kwargs.get("retry_mode", RetryMode.Exponential)
self.backoff_factor = kwargs.get("retry_backoff_factor", 0.8) # type: float
self.backoff_max = kwargs.get("retry_backoff_max", 120) # type: int
self.network_tracing = kwargs.get("network_tracing", False) # type: bool
Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ class EventHubConsumerClient(ClientBase):
The failed internal partition consumer will be closed (`on_partition_close` will be called if provided) and
new internal partition consumer will be created (`on_partition_initialize` will be called if provided) to resume
receiving.
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no further activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down Expand Up @@ -219,6 +228,15 @@ def from_connection_string(cls, conn_str, consumer_group, **kwargs):
information. The failed internal partition consumer will be closed (`on_partition_close` will be called
if provided) and new internal partition consumer will be created (`on_partition_initialize` will be called if
provided) to resume receiving.
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no furthur activity. By default the value is None, meaning that the client will not shutdown due
to inactivity unless initiated by the service.
Expand Down
18 changes: 18 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ class EventHubProducerClient(ClientBase):
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs. Default
value is 3.
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to inactivity
unless initiated by the service.
Expand Down Expand Up @@ -178,6 +187,15 @@ def from_connection_string(cls, conn_str, **kwargs):
:keyword str user_agent: If specified, this will be added in front of the user agent string.
:keyword int retry_total: The total number of attempts to redo a failed operation when an error occurs.
Default value is 3.
:keyword float retry_backoff_factor: A backoff factor to apply between attempts after the second try
(most errors are resolved immediately by a second try without a delay).
In fixed mode, retry policy will always sleep for {backoff factor}.
In 'exponential' mode, retry policy will sleep for: `{backoff factor} * (2 ** ({number of total retries} - 1))`
seconds. If the backoff_factor is 0.1, then the retry will sleep
for [0.0s, 0.2s, 0.4s, ...] between retries. The default value is 0.8.
:keyword int retry_backoff_max: The maximum back off time. Default value is 120 seconds (2 minutes).
:keyword retry_mode: Fixed or exponential delay between attempts, default is exponential.
:paramtype retry_mode: ~azure.eventhub.RetryMode
:keyword float idle_timeout: Timeout, in seconds, after which this client will close the underlying connection
if there is no activity. By default the value is None, meaning that the client will not shutdown due to
inactivity unless initiated by the service.
Expand Down
11 changes: 11 additions & 0 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any

from enum import Enum

class RetryMode(str, Enum):
Exponential = 'exponential'
Fixed = 'fixed'
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ def decode_with_recurse(data, encoding="UTF-8"):
return data
if isinstance(data, six.binary_type):
return data.decode(encoding)
if isinstance(data, Mapping):
if isinstance(data, Mapping): # pylint:disable=isinstance-second-argument-not-valid-type
decoded_mapping = {}
for k, v in data.items():
decoded_key = decode_with_recurse(k, encoding)
decoded_val = decode_with_recurse(v, encoding)
decoded_mapping[decoded_key] = decoded_val
return decoded_mapping
if isinstance(data, Iterable):
if isinstance(data, Iterable): # pylint:disable=isinstance-second-argument-not-valid-type
decoded_list = []
for d in data:
decoded_list.append(decode_with_recurse(d, encoding))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import sys


def get_dict_with_loop_if_needed(loop):
if sys.version_info >= (3, 10):
if loop:
Expand Down
Loading