Skip to content

Commit

Permalink
[ServiceBus/EventHub] lock pending deliveries on send (Azure#38067)
Browse files Browse the repository at this point in the history
* [ServiceBus/EventHub] lock pending deliveries on send

* remove misc logging

* changelog + test

* fix tests, remove session lock

* remove logging from test

* sync with sb

* add todo in sender.py tfor temporary fix
  • Loading branch information
swathipil authored Jan 22, 2025
1 parent d17f44b commit 70470de
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 38 deletions.
33 changes: 19 additions & 14 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import logging
import time
from threading import Lock

from ._encode import encode_payload
from .link import Link
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs):
kwargs["source_address"] = "sender-link-{}".format(name)
super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs)
self._pending_deliveries = []
self.lock = Lock()

@classmethod
def from_incoming_frame(cls, session, handle, frame):
Expand Down Expand Up @@ -139,21 +141,24 @@ def _on_session_state_change(self):
super()._on_session_state_change()

def update_pending_deliveries(self):
if self.current_link_credit <= 0:
self.current_link_credit = self.link_credit
self._outgoing_flow()
now = time.time()
pending = []
for delivery in self._pending_deliveries:
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
continue
if not delivery.sent:
sent_and_settled = self._outgoing_transfer(delivery)
if sent_and_settled:
# TODO: Temporary fix until connection.listen removed from keep alive thread.
with self.lock:
if self.current_link_credit <= 0:
self.current_link_credit = self.link_credit
self._outgoing_flow()
now = time.time()
pending = []

for delivery in self._pending_deliveries:
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
continue
pending.append(delivery)
self._pending_deliveries = pending
if not delivery.sent:
sent_and_settled = self._outgoing_transfer(delivery)
if sent_and_settled:
continue
pending.append(delivery)
self._pending_deliveries = pending

def send_transfer(self, message, *, send_async=False, **kwargs):
self._check_if_closed()
Expand Down
2 changes: 2 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

### Bugs Fixed

- Fixed a bug where sending large messages with synchronous client caused a frame buffer offset error ([#37916](https://github.com/Azure/azure-sdk-for-python/issues/37916))

### Other Changes

## 7.13.0 (2024-11-12)
Expand Down
33 changes: 19 additions & 14 deletions sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import uuid
import logging
import time
from threading import Lock

from ._encode import encode_payload
from .link import Link
Expand Down Expand Up @@ -45,6 +46,7 @@ def __init__(self, session, handle, target_address, **kwargs):
kwargs["source_address"] = "sender-link-{}".format(name)
super(SenderLink, self).__init__(session, handle, name, role, target_address=target_address, **kwargs)
self._pending_deliveries = []
self.lock = Lock()

@classmethod
def from_incoming_frame(cls, session, handle, frame):
Expand Down Expand Up @@ -139,21 +141,24 @@ def _on_session_state_change(self):
super()._on_session_state_change()

def update_pending_deliveries(self):
if self.current_link_credit <= 0:
self.current_link_credit = self.link_credit
self._outgoing_flow()
now = time.time()
pending = []
for delivery in self._pending_deliveries:
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
continue
if not delivery.sent:
sent_and_settled = self._outgoing_transfer(delivery)
if sent_and_settled:
# TODO: Temporary fix until connection.listen removed from keep alive thread.
with self.lock:
if self.current_link_credit <= 0:
self.current_link_credit = self.link_credit
self._outgoing_flow()
now = time.time()
pending = []

for delivery in self._pending_deliveries:
if delivery.timeout and (now - delivery.start) >= delivery.timeout:
delivery.on_settled(LinkDeliverySettleReason.TIMEOUT, None)
continue
pending.append(delivery)
self._pending_deliveries = pending
if not delivery.sent:
sent_and_settled = self._outgoing_transfer(delivery)
if sent_and_settled:
continue
pending.append(delivery)
self._pending_deliveries = pending

def send_transfer(self, message, *, send_async=False, **kwargs):
self._check_if_closed()
Expand Down
69 changes: 59 additions & 10 deletions sdk/servicebus/azure-servicebus/tests/test_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import logging
import pytest
import time
import json
import sys

from devtools_testutils import AzureMgmtRecordedTestCase, RandomNameResourceGroupPreparer, get_credential

Expand Down Expand Up @@ -36,7 +39,7 @@ class TestServiceBusTopics(AzureMgmtRecordedTestCase):
@CachedServiceBusTopicPreparer(name_prefix="servicebustest")
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_topic_by_servicebus_client_conn_str_send_basic(
def test_topic_by_servicebus_client_send_basic(
self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs
):
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
Expand All @@ -58,15 +61,15 @@ def test_topic_by_servicebus_client_conn_str_send_basic(
@CachedServiceBusTopicPreparer(name_prefix="servicebustest")
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_topic_by_sas_token_credential_conn_str_send_basic(
self,
uamqp_transport,
*,
servicebus_namespace=None,
servicebus_namespace_key_name=None,
servicebus_namespace_primary_key=None,
servicebus_topic=None,
**kwargs,
def test_topic_by_sas_token_credential_send_basic(
self,
uamqp_transport,
*,
servicebus_namespace=None,
servicebus_namespace_key_name=None,
servicebus_namespace_primary_key=None,
servicebus_topic=None,
**kwargs
):
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
with ServiceBusClient(
Expand Down Expand Up @@ -111,3 +114,49 @@ def test_topic_by_servicebus_client_list_topics(
topics = client.list_topics()
assert len(topics) >= 1
# assert all(isinstance(t, TopicClient) for t in topics)

@pytest.mark.liveTest
@pytest.mark.live_test_only
@CachedServiceBusResourceGroupPreparer(name_prefix='servicebustest')
@CachedServiceBusNamespacePreparer(name_prefix='servicebustest')
@CachedServiceBusTopicPreparer(name_prefix='servicebustest')
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
@ArgPasser()
def test_topic_by_servicebus_client_send_large_messages_w_sleep(self, uamqp_transport, *, servicebus_namespace=None, servicebus_topic=None, **kwargs):
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
credential = get_credential()

# message of 100 kb - requires multiple transfer frames
size = 100
large_dict = {
"key": "A" * 1024
}
for i in range(size):
large_dict[f"key_{i}"] = "A" * 1024
body = json.dumps(large_dict)

sb_client = ServiceBusClient(
fully_qualified_namespace=fully_qualified_namespace,
credential=credential,
logging_enable=True,
uamqp_transport=uamqp_transport
)

# This issue doesn't repro unless logging is added here w/ this socket timeout,
# seemingly due to slowing down and some threading behavior.
# Adding in the logging here to make sure this bug is being hit and tested.
sender = sb_client.get_topic_sender(servicebus_topic.name, socket_timeout=60)
for i in range(10):
try:
time.sleep(10)
logging.info("sender created for %d", i)
size_in_bytes = sys.getsizeof(body)

# Convert bytes to kilobytes (KB)
size_in_kb = size_in_bytes / 1024
logging.info(f"size of body: {size_in_kb:.2f} KB")
sender.send_messages(ServiceBusMessage(body))
logging.info(f"Message sent %d successfully", i)
except Exception as e:
logging.error(f"Error sending message %d: %s", i, str(e))
raise

0 comments on commit 70470de

Please sign in to comment.