You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I would like to add support for the gRPC AsyncIO API server.
I think it should be relatively straightforward to implement. We just need to subclass the grpc.aio.ServerInterceptor instead of grpc.ServerInterceptor and add some async/await logic.
Something like this seems to work well.
class AsyncOpenTelemetryServerInterceptor(grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor):
"""
An AsyncIO gRPC server interceptor, to add OpenTelemetry.
Usage::
tracer = some OpenTelemetry tracer
interceptors = [
AsyncOpenTelemetryServerInterceptor(tracer),
]
server = aio.server(
futures.ThreadPoolExecutor(max_workers=concurrency),
interceptors = (interceptors,))
"""
async def intercept_service(self, continuation, handler_call_details):
def telemetry_wrapper(behavior, request_streaming, response_streaming):
async def telemetry_interceptor(request_or_iterator, context):
# handle streaming responses specially
if response_streaming:
return self._intercept_server_stream(
behavior,
handler_call_details,
request_or_iterator,
context,
)
with self._set_remote_context(context):
with self._start_span(
handler_call_details,
context,
set_status_on_exception=False,
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)
# And now we run the actual RPC.
try:
return await behavior(request_or_iterator, context)
except Exception as error:
# Bare exceptions are likely to be gRPC aborts, which
# we handle in our context wrapper.
# Here, we're interested in uncaught exceptions.
# pylint:disable=unidiomatic-typecheck
if type(error) != Exception:
span.record_exception(error)
raise error
return telemetry_interceptor
next_handler = await continuation(handler_call_details)
return _wrap_rpc_behavior(
next_handler, telemetry_wrapper
)
I am unsure about intercepting the streaming response. Thoughts?
The text was updated successfully, but these errors were encountered:
I would like to add support for the gRPC AsyncIO API server.
I think it should be relatively straightforward to implement. We just need to subclass the
grpc.aio.ServerInterceptor
instead ofgrpc.ServerInterceptor
and add someasync/await
logic.Something like this seems to work well.
I am unsure about intercepting the streaming response. Thoughts?
The text was updated successfully, but these errors were encountered: