Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for gRPC aio #1099

Closed
BalazsHoranyi opened this issue May 20, 2022 · 2 comments · Fixed by #1245
Closed

Support for gRPC aio #1099

BalazsHoranyi opened this issue May 20, 2022 · 2 comments · Fixed by #1245
Assignees

Comments

@BalazsHoranyi
Copy link

I would like to add support for the gRPC AsyncIO API server.
I think it should be relatively straightforward to implement. We just need to subclass the grpc.aio.ServerInterceptor instead of grpc.ServerInterceptor and add some async/await logic.
Something like this seems to work well.

class AsyncOpenTelemetryServerInterceptor(grpc.aio.ServerInterceptor, OpenTelemetryServerInterceptor):
    """
    An AsyncIO gRPC server interceptor, to add OpenTelemetry.
    Usage::
        tracer = some OpenTelemetry tracer
        interceptors = [
            AsyncOpenTelemetryServerInterceptor(tracer),
        ]
        server = aio.server(
            futures.ThreadPoolExecutor(max_workers=concurrency),
            interceptors = (interceptors,))
    """


    async def intercept_service(self, continuation, handler_call_details):
        def telemetry_wrapper(behavior, request_streaming, response_streaming):
            async def telemetry_interceptor(request_or_iterator, context):

                # handle streaming responses specially
                if response_streaming:
                    return self._intercept_server_stream(
                        behavior,
                        handler_call_details,
                        request_or_iterator,
                        context,
                    )

                with self._set_remote_context(context):
                    with self._start_span(
                        handler_call_details,
                        context,
                        set_status_on_exception=False,
                    ) as span:
                        # wrap the context
                        context = _OpenTelemetryServicerContext(context, span)

                        # And now we run the actual RPC.
                        try:
                            return await behavior(request_or_iterator, context)

                        except Exception as error:
                            # Bare exceptions are likely to be gRPC aborts, which
                            # we handle in our context wrapper.
                            # Here, we're interested in uncaught exceptions.
                            # pylint:disable=unidiomatic-typecheck
                            if type(error) != Exception:
                                span.record_exception(error)
                            raise error

            return telemetry_interceptor

        next_handler = await continuation(handler_call_details)


        return _wrap_rpc_behavior(
            next_handler, telemetry_wrapper
        )

I am unsure about intercepting the streaming response. Thoughts?

@aabmass
Copy link
Member

aabmass commented Jun 9, 2022

This would be awesome, would you be open to submitting a PR @BalazsHoranyi ?

@cookiefission
Copy link
Contributor

We'll need this support before we can adopt opentelemetry so I'm going to try and tackle this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants