Skip to content

Commit

Permalink
Eventhub tracing (Azure#7153)
Browse files Browse the repository at this point in the history
* Experimentation on tracing and EventHubs

* Continue to let the initial excp raise

* Naive direct tracing implementation

* Update Kind in EventHub to generic one

* Use contextmanager in EventHub

* Remove opencensus specific import

* Fix possible AttributeError

* Remove parent concept

* Remove receive tracing code

* Don't execute tracing on message if no tracing loaded

* Try to re-order dev dep for CI

* Fix EH plugin dev deps

* Add azure-core to azure-eventhub

* Share req

* ChangeLog

* EH extension is ok with b4

* Install blob SDK for EH extension

* pylint

* fix dev req

* dep fix

* the override had <. the setup actually defines <=. need to update the error message, but this will fix the analyze error

* Tracing message from receive iterators as well

* Producer simplification

* Simplify eventprocessor

* Consider batch size
  • Loading branch information
lmazuel authored and yijxie committed Oct 9, 2019
1 parent 20d4059 commit 039d017
Show file tree
Hide file tree
Showing 14 changed files with 131 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-e ../../../tools/azure-sdk-tools
-e ../../core/azure-core
-e ../../storage/azure-storage-blob
../azure-eventhubs
pytest-asyncio>=0.8.0; python_version >= '3.5'
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(exclude=exclude_packages),
python_requires=">=3.5.3",
install_requires=[
'azure-storage-blob<12.0.0b4,>=12.0.0b2',
'azure-storage-blob<=12.0.0b4,>=12.0.0b2',
'azure-eventhub<6.0.0,>=5.0.0b3',
'aiohttp<4.0,>=3.0',
],
Expand Down
6 changes: 6 additions & 0 deletions sdk/eventhub/azure-eventhubs/HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 5.0.0b4 (2019-XX-XX)

**New features**

- Support for tracing #7153

## 5.0.0b3 (2019-09-10)

**New features**
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# --------------------------------------------------------------------------------------------

__path__ = __import__('pkgutil').extend_path(__path__, __name__) # type: ignore
__version__ = "5.0.0b3"
__version__ = "5.0.0b4"
from uamqp import constants # type: ignore
from azure.eventhub.common import EventData, EventDataBatch, EventPosition
from azure.eventhub.error import EventHubError, EventDataError, ConnectError, \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ async def __anext__(self):
self._messages_iter = self._handler.receive_messages_iter_async()
message = await self._messages_iter.__anext__()
event_data = EventData._from_message(message) # pylint:disable=protected-access
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
return event_data
Expand Down Expand Up @@ -179,6 +180,8 @@ async def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

return data_batch

async def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# -----------------------------------------------------------------------------------

from contextlib import contextmanager
from typing import Dict, Type
import uuid
import asyncio
import logging

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub import EventPosition, EventHubError
from azure.eventhub.aio import EventHubClient
from .partition_context import PartitionContext
Expand Down Expand Up @@ -186,7 +190,23 @@ def _create_tasks_for_claimed_ownership(self, to_claim_ownership_list):
if partition_id not in self._tasks or self._tasks[partition_id].done():
self._tasks[partition_id] = get_running_loop().create_task(self._receive(ownership))

async def _receive(self, ownership):
@contextmanager
def _context(self, events):
# Tracing
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is None:
yield
else:
child = span_impl_type(name="Azure.EventHubs.process")
self._eventhub_client._add_span_request_attributes(child) # pylint: disable=protected-access
child.kind = SpanKind.SERVER

for event in events:
event._trace_link_message(child) # pylint: disable=protected-access
with child:
yield

async def _receive(self, ownership): # pylint: disable=too-many-statements
log.info("start ownership, %r", ownership)
partition_processor = self._partition_processor_factory()
partition_id = ownership["partition_id"]
Expand Down Expand Up @@ -248,7 +268,9 @@ async def close(reason):
while True:
try:
events = await partition_consumer.receive()
await partition_processor.process_events(events, partition_context)
with self._context(events):
await partition_processor.process_events(events, partition_context)

except asyncio.CancelledError:
log.info(
"PartitionProcessor of EventProcessor instance %r of eventhub %r partition %r consumer group %r"
Expand Down
24 changes: 21 additions & 3 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/aio/producer_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
import uuid
import asyncio
import logging
from typing import Iterable, Union, Any
from typing import Iterable, Union
import time

from uamqp import types, constants, errors # type: ignore
from uamqp import SendClientAsync # type: ignore

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ..producer import _error, _set_partition_key
from ..producer import _error, _set_partition_key, _set_trace_message
from ._consumer_producer_mixin_async import ConsumerProducerMixin

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -213,12 +216,19 @@ async def send(
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.
"""
# Tracing code
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER

self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
wrapper_event_data._trace_message(child) # pylint: disable=protected-access
else:
if isinstance(event_data, EventDataBatch):
if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access
Expand All @@ -227,10 +237,18 @@ async def send(
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data, child)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access

wrapper_event_data.message.on_send_complete = self._on_outcome
self._unsent_events = [wrapper_event_data.message]
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor

if span_impl_type is not None:
with child:
self._client._add_span_request_attributes(child) # pylint: disable=protected-access
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor
else:
await self._send_event_data_with_retry(timeout=timeout) # pylint:disable=unexpected-keyword-arg # TODO: to refactor

async def close(self, exception=None):
# type: (Exception) -> None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ def _create_properties(self, user_agent=None): # pylint: disable=no-self-use
properties["user-agent"] = final_user_agent
return properties

def _add_span_request_attributes(self, span):
span.add_attribute("component", "eventhubs")
span.add_attribute("message_bus.destination", self._address.path)
span.add_attribute("peer.address", self._address.hostname)

def _process_redirect_uri(self, redirect):
redirect_uri = redirect.address.decode('utf-8')
auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups")
Expand Down
34 changes: 34 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

from uamqp import BatchMessage, Message, types, constants # type: ignore
from uamqp.message import MessageHeader, MessageProperties # type: ignore

from azure.core.settings import settings

from azure.eventhub.error import EventDataError

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -114,6 +117,35 @@ def _set_partition_key(self, value):
self.message.header = header
self._annotations = annotations

def _trace_message(self, parent_span=None):
"""Add tracing information to this message.
Will open and close a "Azure.EventHubs.message" span, and
add the "DiagnosticId" as app properties of the message.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
message_span = current_span.span(name="Azure.EventHubs.message")
message_span.start()
app_prop = dict(self.application_properties)
app_prop.setdefault(b"Diagnostic-Id", message_span.get_trace_parent().encode('ascii'))
self.application_properties = app_prop
message_span.finish()

def _trace_link_message(self, parent_span=None):
"""Link the current message to current span.
Will extract DiagnosticId if available.
"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(span_impl_type.get_current_span())
if current_span and self.application_properties:
traceparent = self.application_properties.get(b"Diagnostic-Id", "").decode('ascii')
if traceparent:
current_span.link(traceparent)

@staticmethod
def _from_message(message):
event_data = EventData(body='')
Expand Down Expand Up @@ -328,6 +360,8 @@ def try_add(self, event_data):
if not event_data.partition_key:
event_data._set_partition_key(self._partition_key) # pylint:disable=protected-access

event_data._trace_message() # pylint:disable=protected-access

event_data_size = event_data.message.get_message_encoded_size()

# For a BatchMessage, if the encoded_message_size of event_data is < 256, then the overhead cost to encode that
Expand Down
3 changes: 3 additions & 0 deletions sdk/eventhub/azure-eventhubs/azure/eventhub/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __next__(self):
self._messages_iter = self._handler.receive_messages_iter()
message = next(self._messages_iter)
event_data = EventData._from_message(message) # pylint:disable=protected-access
event_data._trace_link_message() # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset, inclusive=False)
retried_times = 0
return event_data
Expand Down Expand Up @@ -173,6 +174,8 @@ def _receive(self, timeout_time=None, max_batch_size=None, **kwargs):
event_data = EventData._from_message(message) # pylint:disable=protected-access
self._offset = EventPosition(event_data.offset)
data_batch.append(event_data)
event_data._trace_link_message() # pylint:disable=protected-access

return data_batch

def _receive_with_retry(self, timeout=None, max_batch_size=None, **kwargs):
Expand Down
26 changes: 25 additions & 1 deletion sdk/eventhub/azure-eventhubs/azure/eventhub/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from uamqp import types, constants, errors # type: ignore
from uamqp import SendClient # type: ignore

from azure.core.tracing import SpanKind
from azure.core.settings import settings

from azure.eventhub.common import EventData, EventDataBatch
from azure.eventhub.error import _error_handler, OperationTimeoutError, EventDataError
from ._consumer_producer_mixin import ConsumerProducerMixin
Expand All @@ -32,6 +35,13 @@ def _set_partition_key(event_datas, partition_key):
yield ed


def _set_trace_message(event_datas, parent_span=None):
ed_iter = iter(event_datas)
for ed in ed_iter:
ed._trace_message(parent_span) # pylint:disable=protected-access
yield ed


class EventHubProducer(ConsumerProducerMixin): # pylint:disable=too-many-instance-attributes
"""
A producer responsible for transmitting EventData to a specific Event Hub,
Expand Down Expand Up @@ -218,12 +228,19 @@ def send(self, event_data, partition_key=None, timeout=None):
:caption: Sends an event data and blocks until acknowledgement is received or operation times out.
"""
# Tracing code
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
child = None
if span_impl_type is not None:
child = span_impl_type(name="Azure.EventHubs.send")
child.kind = SpanKind.CLIENT # Should be PRODUCER

self._check_closed()
if isinstance(event_data, EventData):
if partition_key:
event_data._set_partition_key(partition_key) # pylint: disable=protected-access
wrapper_event_data = event_data
wrapper_event_data._trace_message(child) # pylint: disable=protected-access
else:
if isinstance(event_data, EventDataBatch): # The partition_key in the param will be omitted.
if partition_key and partition_key != event_data._partition_key: # pylint: disable=protected-access
Expand All @@ -232,10 +249,17 @@ def send(self, event_data, partition_key=None, timeout=None):
else:
if partition_key:
event_data = _set_partition_key(event_data, partition_key)
event_data = _set_trace_message(event_data, child)
wrapper_event_data = EventDataBatch._from_batch(event_data, partition_key) # pylint: disable=protected-access
wrapper_event_data.message.on_send_complete = self._on_outcome
self._unsent_events = [wrapper_event_data.message]
self._send_event_data_with_retry(timeout=timeout)

if span_impl_type is not None:
with child:
self._client._add_span_request_attributes(child) # pylint: disable=protected-access
self._send_event_data_with_retry(timeout=timeout)
else:
self._send_event_data_with_retry(timeout=timeout)

def close(self, exception=None): # pylint:disable=useless-super-delegation
# type:(Exception) -> None
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhub/azure-eventhubs/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-e ../../servicebus/azure-servicebus
-e ../../core/azure-core
-e ../../../tools/azure-sdk-tools
-e ../../core/azure-core
-e ../../identity/azure-identity
-e ../../servicebus/azure-servicebus
pytest-asyncio>=0.8.0; python_version > '3.4'
docutils>=0.14
pygments>=2.2.0
Expand Down
1 change: 1 addition & 0 deletions sdk/eventhub/azure-eventhubs/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
zip_safe=False,
packages=find_packages(exclude=exclude_packages),
install_requires=[
"azure-core<2.0.0,>=1.0.0b4",
'uamqp~=1.2.0',
'azure-common~=1.1',
],
Expand Down
5 changes: 3 additions & 2 deletions shared_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ six>=1.6
#override azure-storage-blob azure-core<2.0.0,>=1.0.0b4
#override azure-storage-queue azure-core<2.0.0,>=1.0.0b4
#override azure-storage-file azure-core<2.0.0,>=1.0.0b4
#override azure-eventhub azure-core<2.0.0,>=1.0.0b4
#override azure-cosmos azure-core<2.0.0,>=1.0.0b3
#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<12.0.0b4,>=12.0.0b2
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0
#override azure-eventhub-checkpointstoreblob-aio azure-storage-blob<=12.0.0b4,>=12.0.0b2
#override azure-eventhub-checkpointstoreblob-aio aiohttp<4.0,>=3.0

0 comments on commit 039d017

Please sign in to comment.