-
Notifications
You must be signed in to change notification settings - Fork 642
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This adds support for instrumenting grpc.aio channels with spans and telemetry. The instrumentation needed to work differently that the standard grpc channel support but is functionally the same.
- Loading branch information
1 parent
03e021b
commit ff4403c
Showing
7 changed files
with
798 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
219 changes: 219 additions & 0 deletions
219
.../opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
# Copyright The OpenTelemetry Authors | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
from collections import OrderedDict | ||
import functools | ||
|
||
import grpc | ||
from grpc.aio import ClientCallDetails | ||
|
||
from opentelemetry import context, trace | ||
from opentelemetry.semconv.trace import SpanAttributes | ||
from opentelemetry.propagate import inject | ||
from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY | ||
from opentelemetry.instrumentation.grpc.version import __version__ | ||
|
||
from opentelemetry.trace.status import Status, StatusCode | ||
|
||
from opentelemetry.instrumentation.grpc._client import ( | ||
OpenTelemetryClientInterceptor, | ||
_carrier_setter, | ||
) | ||
|
||
|
||
def _unary_done_callback(span, code, details): | ||
def callback(call): | ||
try: | ||
span.set_attribute( | ||
SpanAttributes.RPC_GRPC_STATUS_CODE, | ||
code.value[0], | ||
) | ||
if code != grpc.StatusCode.OK: | ||
span.set_status( | ||
Status( | ||
status_code=StatusCode.ERROR, | ||
description=details, | ||
) | ||
) | ||
finally: | ||
span.end() | ||
|
||
return callback | ||
|
||
|
||
class _BaseAioClientInterceptor(OpenTelemetryClientInterceptor): | ||
@staticmethod | ||
def propagate_trace_in_details(client_call_details): | ||
method = client_call_details.method.decode("utf-8") | ||
metadata = client_call_details.metadata | ||
if not metadata: | ||
mutable_metadata = OrderedDict() | ||
else: | ||
mutable_metadata = OrderedDict(metadata) | ||
|
||
inject(mutable_metadata, setter=_carrier_setter) | ||
metadata = tuple(mutable_metadata.items()) | ||
|
||
return ClientCallDetails( | ||
client_call_details.method, | ||
client_call_details.timeout, | ||
metadata, | ||
client_call_details.credentials, | ||
client_call_details.wait_for_ready, | ||
) | ||
|
||
@staticmethod | ||
def add_error_details_to_span(span, exc): | ||
if isinstance(exc, grpc.RpcError): | ||
span.set_attribute( | ||
SpanAttributes.RPC_GRPC_STATUS_CODE, | ||
exc.code().value[0], | ||
) | ||
span.set_status( | ||
Status( | ||
status_code=StatusCode.ERROR, | ||
description=f"{type(exc).__name__}: {exc}", | ||
) | ||
) | ||
span.record_exception(exc) | ||
|
||
async def _wrap_unary_response(self, continuation, span): | ||
try: | ||
call = await continuation() | ||
|
||
# code and details are both coroutines that need to be await-ed, | ||
# the callbacks added with add_done_callback do not allow async | ||
# code so we need to get the code and details here then pass them | ||
# to the callback. | ||
code = await call.code() | ||
details = await call.details() | ||
|
||
call.add_done_callback(_unary_done_callback(span, code, details)) | ||
|
||
return call | ||
except grpc.aio.AioRpcError as exc: | ||
self.add_error_details_to_span(span, exc) | ||
raise exc | ||
|
||
async def _wrap_stream_response(self, span, call): | ||
try: | ||
async for response in call: | ||
yield response | ||
except Exception as exc: | ||
self.add_error_details_to_span(span, exc) | ||
raise exc | ||
finally: | ||
span.end() | ||
|
||
|
||
class UnaryUnaryAioClientInterceptor( | ||
grpc.aio.UnaryUnaryClientInterceptor, | ||
_BaseAioClientInterceptor, | ||
): | ||
async def intercept_unary_unary( | ||
self, continuation, client_call_details, request | ||
): | ||
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): | ||
return await continuation(client_call_details, request) | ||
|
||
method = client_call_details.method.decode("utf-8") | ||
with self._start_span( | ||
method, | ||
end_on_exit=False, | ||
record_exception=False, | ||
set_status_on_exception=False, | ||
) as span: | ||
new_details = self.propagate_trace_in_details(client_call_details) | ||
|
||
continuation_with_args = functools.partial( | ||
continuation, new_details, request | ||
) | ||
return await self._wrap_unary_response( | ||
continuation_with_args, span | ||
) | ||
|
||
|
||
class UnaryStreamAioClientInterceptor( | ||
grpc.aio.UnaryStreamClientInterceptor, | ||
_BaseAioClientInterceptor, | ||
): | ||
async def intercept_unary_stream( | ||
self, continuation, client_call_details, request | ||
): | ||
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): | ||
return await continuation(client_call_details, request) | ||
|
||
method = client_call_details.method.decode("utf-8") | ||
with self._start_span( | ||
method, | ||
end_on_exit=False, | ||
record_exception=False, | ||
set_status_on_exception=False, | ||
) as span: | ||
new_details = self.propagate_trace_in_details(client_call_details) | ||
|
||
resp = await continuation(new_details, request) | ||
|
||
return self._wrap_stream_response(span, resp) | ||
|
||
|
||
class StreamUnaryAioClientInterceptor( | ||
grpc.aio.StreamUnaryClientInterceptor, | ||
_BaseAioClientInterceptor, | ||
): | ||
async def intercept_stream_unary( | ||
self, continuation, client_call_details, request_iterator | ||
): | ||
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): | ||
return await continuation(client_call_details, request_iterator) | ||
|
||
method = client_call_details.method.decode("utf-8") | ||
with self._start_span( | ||
method, | ||
end_on_exit=False, | ||
record_exception=False, | ||
set_status_on_exception=False, | ||
) as span: | ||
new_details = self.propagate_trace_in_details(client_call_details) | ||
|
||
continuation_with_args = functools.partial( | ||
continuation, new_details, request_iterator | ||
) | ||
return await self._wrap_unary_response( | ||
continuation_with_args, span | ||
) | ||
|
||
|
||
class StreamStreamAioClientInterceptor( | ||
grpc.aio.StreamStreamClientInterceptor, | ||
_BaseAioClientInterceptor, | ||
): | ||
async def intercept_stream_stream( | ||
self, continuation, client_call_details, request_iterator | ||
): | ||
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): | ||
return await continuation(client_call_details, request_iterator) | ||
|
||
method = client_call_details.method.decode("utf-8") | ||
with self._start_span( | ||
method, | ||
end_on_exit=False, | ||
record_exception=False, | ||
set_status_on_exception=False, | ||
) as span: | ||
new_details = self.propagate_trace_in_details(client_call_details) | ||
|
||
resp = await continuation(new_details, request_iterator) | ||
|
||
return self._wrap_stream_response(span, resp) |
Oops, something went wrong.