diff --git a/CHANGELOG.md b/CHANGELOG.md index d5aaca89ee..9904b50a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` & `opentelemetry-propagator-aws` Release AWS Python SDK Extension as 2.0.1 and AWS Propagator as 1.0.1 ([#753](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/753)) +- `opentelemetry-instrumentation-pika` Add `_decorate_basic_consume` to ensure post instrumentation `basic_consume` calls are also instrumented. + ([#759](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/759)) ### Fixed diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index a48e46034e..05496f53dd 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -70,6 +70,7 @@ def _uninstrument_channel_functions(channel: Channel) -> None: function = getattr(channel, function_name) if hasattr(function, "_original_function"): channel.__setattr__(function_name, function._original_function) + unwrap(channel, "basic_consume") @staticmethod def instrument_channel( @@ -90,6 +91,7 @@ def instrument_channel( PikaInstrumentor._instrument_consumers( channel._impl._consumers, tracer ) + PikaInstrumentor._decorate_basic_consume(channel, tracer) PikaInstrumentor._instrument_channel_functions(channel, tracer) @staticmethod @@ -120,6 +122,33 @@ def wrapper(wrapped, instance, args, kwargs): wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper) + @staticmethod + def _decorate_basic_consume(channel, tracer: Optional[Tracer]) -> None: + def wrapper(wrapped, instance, args, kwargs): + if not hasattr(channel, "_impl"): + _LOG.error( + "Could not find implementation for provided channel!" + ) + return wrapped(*args, **kwargs) + current_keys = set(channel._impl._consumers.keys()) + return_value = wrapped(*args, **kwargs) + new_key_list = list( + set(channel._impl._consumers.keys()) - current_keys + ) + if not new_key_list: + _LOG.error("Could not find added callback") + return return_value + new_key = new_key_list[0] + callback = channel._impl._consumers[new_key] + decorated_callback = utils._decorate_callback( + callback, tracer, new_key + ) + setattr(decorated_callback, "_original_callback", callback) + channel._impl._consumers[new_key] = decorated_callback + return return_value + + wrapt.wrap_function_wrapper(channel, "basic_consume", wrapper) + def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) self.__setattr__("__opentelemetry_tracer_provider", tracer_provider) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index d1d85b299b..12161d2334 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -41,6 +41,8 @@ def decorated_callback( ) -> Any: if not properties: properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} ctx = propagate.extract(properties.headers, getter=_pika_getter) if not ctx: ctx = context.get_current() @@ -74,6 +76,8 @@ def decorated_function( ) -> Any: if not properties: properties = BasicProperties(headers={}) + if properties.headers is None: + properties.headers = {} ctx = context.get_current() span = _get_span( tracer, diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 508d49c3bd..da2a940b5b 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -45,12 +45,16 @@ def test_instrument_api(self) -> None: @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentor._decorate_basic_consume" + ) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers" ) def test_instrument( self, instrument_consumers: mock.MagicMock, + instrument_basic_consume: mock.MagicMock, instrument_channel_functions: mock.MagicMock, ): PikaInstrumentor.instrument_channel(channel=self.channel) @@ -58,6 +62,7 @@ def test_instrument( self.channel, "_is_instrumented_by_opentelemetry" ), "channel is not marked as instrumented!" instrument_consumers.assert_called_once() + instrument_basic_consume.assert_called_once() instrument_channel_functions.assert_called_once() @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py index 5f63b4af5e..45741f0821 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py @@ -17,6 +17,7 @@ from wrapt import ObjectProxy # pylint: disable=unused-import +# pylint: disable=E0611 from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY # noqa: F401 from opentelemetry.trace import StatusCode