Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EventHubs] Idempotent producer #16756

Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
98d7c03
sync idempotent producer constructor
yunhaoling Feb 15, 2021
45b5622
sync idempotent producer prototype
yunhaoling Feb 17, 2021
173252e
imporve constants
yunhaoling Feb 17, 2021
68da29e
async impl
yunhaoling Feb 19, 2021
556685e
remove duplicate code of validation on outgoing eventdata
yunhaoling Feb 22, 2021
282dbe2
fix bug, add basic test
yunhaoling Feb 22, 2021
4438a7f
fix mypy and pylint
yunhaoling Feb 23, 2021
4ddf3fe
fix implementation bug
yunhaoling Feb 24, 2021
4a0b979
review feedback
yunhaoling Feb 24, 2021
653b4ca
validate partition configs
yunhaoling Feb 24, 2021
37bbdfe
add tests
yunhaoling Feb 24, 2021
25e6951
add changelog and samples
yunhaoling Feb 24, 2021
dc8c561
update readme
yunhaoling Feb 24, 2021
3b07456
update shared-requirements
yunhaoling Feb 24, 2021
8f9e47a
merge master
yunhaoling Feb 26, 2021
8f5cf9f
update tests yml to test unreleased uamqp v1.2.15
yunhaoling Feb 26, 2021
74d0f6a
fix tests
yunhaoling Mar 1, 2021
c0fd953
more test fix
yunhaoling Mar 1, 2021
ef77168
more test fix
yunhaoling Mar 1, 2021
41026c9
addressing comments
yunhaoling Mar 1, 2021
e1fb000
add more docs
yunhaoling Mar 1, 2021
a89b04a
fix tests
yunhaoling Mar 2, 2021
199631d
fix tox warning
yunhaoling Mar 2, 2021
2bbe734
fix pylint
yunhaoling Mar 2, 2021
d47e6a2
fix pylint
yunhaoling Mar 2, 2021
005e6cb
remove the change in tests.yml
yunhaoling Mar 2, 2021
aa6b0e4
revert non-existing links first
yunhaoling Mar 2, 2021
f3535ff
update setup.py
yunhaoling Mar 3, 2021
5b8e65c
Merge remote-tracking branch 'central/master' into yuling/eh/idempote…
yunhaoling Mar 4, 2021
2b12883
fix pylint
yunhaoling Mar 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion sdk/eventhub/azure-eventhub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,32 @@
# Release History

## 5.3.1 (Unreleased)
## 5.4.0b1 (Unreleased)

**New Features**

- Added support for idempotent publishing which is supported by the service to endeavor to reduce the number of duplicate
events that are published.
- `EventHubProducerClient` constructor accepts two new parameters for idempotent publishing:
- `enable_idempotent_partitions`: A boolean value to tell the `EventHubProducerClient` whether to enable idempotency.
- `partition_configs`: The set of configurations that can be specified to influence publishing behavior
specific to the configured Event Hub partition.
- Introduced a new method `get_partition_publishing_properties` on `EventHubProducerClient` to inspect the information
about the state of publishing for a partition.
- Introduced a new property `published_sequence_number` on `EventData` to get the publishing sequence number assigned
to the event at the time it was successfully published.
- Introduced a new property `starting_published_sequence_number` on `EventDataBatch` to get the publishing sequence
number assigned to the first event in the batch at the time the batch was successfully published.
- Introduced a new class `azure.eventhub.PartitionPublishingConfiguration` which is a set of configurations that can be
specified to influence the behavior when publishing directly to an Event Hub partition.

**Bug fixes**

- Sending empty `event_data_batch` will be a no-op now instead of raising error.

**Notes**

- Updated uAMQP dependency to 1.2.15.

## 5.3.0 (2021-02-08)

**New Features**
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The Azure Event Hubs client library allows for publishing and consuming of Azure
- Observe interesting operations and interactions happening within your business or other ecosystem, allowing loosely coupled systems to interact without the need to bind them together.
- Receive events from one or more publishers, transform them to better meet the needs of your ecosystem, then publish the transformed events to a new stream for consumers to observe.

[Source code](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html) | [Product documentation](https://docs.microsoft.com/azure/event-hubs/) | [Samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub/samples)
[Source code](https://github.com/Azure/azure-sdk-for-python/blob/master/sdk/eventhub/azure-eventhub/) | [Package (PyPi)](https://pypi.org/project/azure-eventhub/5.4.0b1) | [API reference documentation](https://azuresdkdocs.blob.core.windows.net/$web/python/azure-eventhub/latest/azure.eventhub.html) | [Product documentation](https://docs.microsoft.com/azure/event-hubs/) | [Samples](https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/eventhub/azure-eventhub/samples)
## Getting started

### Prerequisites
Expand All @@ -32,7 +32,7 @@ There, you can also find detailed instructions for using the Azure CLI, Azure Po
Install the Azure Event Hubs client library for Python with pip:

```
$ pip install azure-eventhub
$ pip install azure-eventhub --pre
```

### Authenticate the client
Expand Down
4 changes: 3 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
parse_connection_string,
EventHubConnectionStringProperties
)
from ._configuration import PartitionPublishingConfiguration

TransportType = constants.TransportType

Expand All @@ -33,5 +34,6 @@
"LoadBalancingStrategy",
"PartitionContext",
"parse_connection_string",
"EventHubConnectionStringProperties"
"EventHubConnectionStringProperties",
"PartitionPublishingConfiguration"
]
52 changes: 49 additions & 3 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import six

from uamqp import BatchMessage, Message, constants
from uamqp import BatchMessage, Message, constants, types

from ._utils import set_message_partition_key, trace_message, utc_from_timestamp
from ._constants import (
Expand All @@ -42,6 +42,12 @@
PROP_TO,
PROP_USER_ID,
PROP_CREATION_TIME,
PRODUCER_SEQUENCE_NUMBER_SYMBOL,
PRODUCER_ID_SYMBOL,
PRODUCER_EPOCH_SYMBOL,
MAX_SHORT,
MAX_INT,
MAX_LONG
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -100,6 +106,8 @@ def __init__(self, body=None):
self.message = Message(body)
self.message.annotations = {}
self.message.application_properties = {}
self._published_sequence_number = None
self._pending_published_sequence_number = None

def __repr__(self):
# type: () -> str
Expand Down Expand Up @@ -180,6 +188,18 @@ def sequence_number(self):
"""
return self.message.annotations.get(PROP_SEQ_NUMBER, None)

@property
def published_sequence_number(self):
# type: () -> Optional[int]
"""
The publishing sequence number assigned to the event at the time it was successfully published.
If the producer was not configured to apply sequence numbering or if the event has not yet been successfully
published, the value will be None.

:rtype: int
"""
return self._published_sequence_number

@property
def offset(self):
# type: () -> Optional[str]
Expand Down Expand Up @@ -348,16 +368,18 @@ class EventDataBatch(object):
Event Hub decided by the service.
"""

def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None):
# type: (Optional[int], Optional[str], Optional[Union[str, bytes]]) -> None
def __init__(self, max_size_in_bytes=None, partition_id=None, partition_key=None, **kwargs):
# type: (Optional[int], Optional[str], Optional[Union[str, bytes]], Any) -> None
self.max_size_in_bytes = max_size_in_bytes or constants.MAX_MESSAGE_LENGTH_BYTES
self.message = BatchMessage(data=[], multi_messages=False, properties=None)
self._partition_id = partition_id
self._partition_key = partition_key
self._is_idempotent_batch = kwargs.pop("is_idempotent_batch", False)

set_message_partition_key(self.message, self._partition_key)
self._size = self.message.gather()[0].get_message_encoded_size()
self._count = 0
self._starting_published_sequence_number = None

def __repr__(self):
# type: () -> str
Expand Down Expand Up @@ -396,6 +418,19 @@ def size_in_bytes(self):
"""
return self._size

@property
def starting_published_sequence_number(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel this needs a rename....
And I think my preference is to go with published_sequence_number have the documentation indicate that this refers to the first in the batch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC batches are transactional right? There's no partial failure here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, batches are transactional, either all events in the batch get sent or failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference on the name -- personally I think the "starting_" prefix adds some clarity for a batch object while with just published_sequence_number would give us the benefit of consistency with EventData.

@jsquire , would you be able to provide some context on the "starting_" prefix being chosen instead of just starting_published_sequence_number?

cc: @chradek @conniey

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Starting" was chosen to denote that it was the sequence number applied to the first event in the batch. The batch itself is not assigned a sequence number, nor is it considered a single item by the service.

To determine the next sequence number that the service expects, one has to take the batch's starting sequence number and increment it by the number of events that were in the batch. This caused confusion for our beta users that expected the "LastPublishedSequenceNumber" returned as part of the partition publishing properties did not align with the "PublishedSequenceNumber" that we assigned to the batch. It was agreed that the prefix "Starting" helped as a mnemonic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a separate issue for further discussion on the namings: #16994

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can keep as-is for preview and settle before GA

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the preview may stay for a while (or even be pulled out) so we should have time to discuss among languages :P

# type() -> Optional[int]
"""
The publishing sequence number assigned to the first event in the batch at the time
the batch was successfully published. If the producer was not configured to apply
sequence numbering or if the batch has not yet been successfully published, the value
will be None.

:rtype: int
"""
return self._starting_published_sequence_number

def add(self, event_data):
# type: (EventData) -> None
"""Try to add an EventData to the batch.
Expand All @@ -420,7 +455,17 @@ def add(self, event_data):
if not event_data.partition_key:
set_message_partition_key(event_data.message, self._partition_key)

if self._is_idempotent_batch and event_data.published_sequence_number is not None:
raise ValueError("EventData object that has already been published by "
"idempotent producer cannot be published again.")

trace_message(event_data)
if self._is_idempotent_batch:
# Reserve space for producer-owned fields that correspond to the idempotent publishing, if enabled.
event_data.message.annotations[PRODUCER_EPOCH_SYMBOL] = types.AMQPShort(int(MAX_SHORT))
event_data.message.annotations[PRODUCER_ID_SYMBOL] = types.AMQPLong(int(MAX_LONG))
event_data.message.annotations[PRODUCER_SEQUENCE_NUMBER_SYMBOL] = types.AMQPInt(int(MAX_INT))

event_data_size = event_data.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand All @@ -442,6 +487,7 @@ def add(self, event_data):
self._size = size_after_add
self._count += 1


class DictMixin(object):
def __setitem__(self, key, item):
# type: (Any, Any) -> None
Expand Down
45 changes: 44 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Optional, Dict, Any
from typing import Optional, Any, Dict

try:
from urlparse import urlparse
Expand All @@ -11,6 +11,10 @@

from uamqp.constants import TransportType, DEFAULT_AMQPS_PORT, DEFAULT_AMQP_WSS_PORT

from ._common import DictMixin
from ._utils import validate_producer_client_partition_config



class Configuration(object): # pylint:disable=too-many-instance-attributes
def __init__(self, **kwargs):
Expand All @@ -35,6 +39,7 @@ def __init__(self, **kwargs):
self.connection_verify = kwargs.get("connection_verify") # type: Optional[str]
self.connection_port = DEFAULT_AMQPS_PORT
self.custom_endpoint_hostname = None
self.enable_idempotent_partitions = kwargs.get("enable_idempotent_partitions", False) # type: bool

if self.http_proxy or self.transport_type == TransportType.AmqpOverWebsocket:
self.transport_type = TransportType.AmqpOverWebsocket
Expand All @@ -51,3 +56,41 @@ def __init__(self, **kwargs):
self.custom_endpoint_hostname = endpoint.hostname
# in case proxy and custom endpoint are both provided, we default port to 443 if it's not provided
self.connection_port = endpoint.port or DEFAULT_AMQP_WSS_PORT


class PartitionPublishingConfiguration(DictMixin):
"""
The set of configurations that can be specified for an `EventHubProducerClient`
to influence its behavior when publishing directly to an Event Hub partition.

:ivar int producer_group_id: The identifier of the producer group that this producer is associated with when
publishing to the associated partition.
:ivar int owner_level: The owner level indicates that publishing is intended to be performed exclusively for
events in the requested partition in the context of the associated producer group.
:ivar int starting_sequence_number: The starting number that should be used for the automatic sequencing of
events for the associated partition, when published by this producer.

:keyword int producer_group_id: The identifier of the producer group that this producer is associated with when
publishing to the associated partition. The producer group is only recognized and relevant when certain features
of the producer are enabled. For example, it is used by idempotent publishing.
The producer group id should be in the range from 0 to max signed long (2^63 - 1) as required by the service.
:keyword int owner_level: The owner level indicates that publishing is intended to be performed exclusively for
events in the requested partition in the context of the associated producer group. To do so, publishing will
attempt to assert ownership over the partition; in the case where more than one publisher in the producer
group attempts to assert ownership for the same partition, the one having a larger owner_level value will "win".
The owner level is only recognized and relevant when certain features of the producer are enabled. For example,
it is used by idempotent publishing.
The owner level should be in the range from 0 to max signed short (2^16 - 1) as required by the service.
:keyword int starting_sequence_number: The starting number that should be used for the automatic sequencing of
events for the associated partition, when published by this producer. The starting sequence number is only
recognized and relevant when certain features of the producer are enabled. For example, it is used by idempotent
publishing.
The starting sequence number should be in the range from 0 to max signed integer (2^31 - 1) as
required by the service.

"""
def __init__(self, **kwargs):
validate_producer_client_partition_config(kwargs)
self.owner_level = kwargs.get("owner_level") # type: Optional[int]
self.producer_group_id = kwargs.get("producer_group_id") # type: Optional[int]
self.starting_sequence_number = kwargs.get("starting_sequence_number") # type: Optional[int]
17 changes: 16 additions & 1 deletion sdk/eventhub/azure-eventhub/azure/eventhub/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,18 @@
PROP_GROUP_SEQUENCE = b"group-sequence"
PROP_REPLY_TO_GROUP_ID = b"reply-to-group-id"

EPOCH_SYMBOL = b"com.microsoft:epoch"
CONSUMER_EPOCH = b"com.microsoft:epoch"
CONSUMER_EPOCH_SYMBOL = types.AMQPSymbol(CONSUMER_EPOCH)
TIMEOUT_SYMBOL = b"com.microsoft:timeout"
RECEIVER_RUNTIME_METRIC_SYMBOL = b"com.microsoft:enable-receiver-runtime-metric"
IDEMPOTENT_PRODUCER = b"com.microsoft:idempotent-producer"
PRODUCER_EPOCH = b"com.microsoft:producer-epoch"
PRODUCER_SEQUENCE_NUMBER = b"com.microsoft:producer-sequence-number"
PRODUCER_ID = b"com.microsoft:producer-id"
IDEMPOTENT_PRODUCER_SYMBOL = types.AMQPSymbol(IDEMPOTENT_PRODUCER)
PRODUCER_EPOCH_SYMBOL = types.AMQPSymbol(PRODUCER_EPOCH)
PRODUCER_SEQUENCE_NUMBER_SYMBOL = types.AMQPSymbol(PRODUCER_SEQUENCE_NUMBER)
PRODUCER_ID_SYMBOL = types.AMQPSymbol(PRODUCER_ID)

MAX_USER_AGENT_LENGTH = 512
ALL_PARTITIONS = "all-partitions"
Expand All @@ -51,4 +60,10 @@
b"com.microsoft:auth-failed",
b"com.microsoft:precondition-failed",
b"com.microsoft:argument-error",
b"com.microsoft:out-of-order-sequence",
b"com.microsoft:producer-epoch-stolen"
)

MAX_SHORT = 2**15 - 1
MAX_INT = 2**31 - 1
MAX_LONG = 2**63 - 1
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ._client_base import ConsumerProducerMixin
from ._utils import create_properties, trace_link_message, event_position_selector
from ._constants import (
EPOCH_SYMBOL,
CONSUMER_EPOCH_SYMBOL,
TIMEOUT_SYMBOL,
RECEIVER_RUNTIME_METRIC_SYMBOL,
)
Expand Down Expand Up @@ -109,7 +109,7 @@ def __init__(self, client, source, **kwargs):
self._partition = partition
self._name = "EHConsumer-{}-partition{}".format(uuid.uuid4(), partition)
if owner_level is not None:
self._link_properties[types.AMQPSymbol(EPOCH_SYMBOL)] = types.AMQPLong(
self._link_properties[CONSUMER_EPOCH_SYMBOL] = types.AMQPLong(
int(owner_level)
)
link_property_timeout_ms = (
Expand Down
Loading