Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(grpcrouter): add hybrid query handler (backport #17921) #18339

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 91 additions & 42 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package baseapp

import (
"context"
"fmt"

abci "github.com/cometbft/cometbft/abci/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/runtime/protoiface"

"github.com/cosmos/cosmos-sdk/baseapp/internal/protocompat"
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
Expand All @@ -16,8 +21,16 @@ import (

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
cdc encoding.Codec
// routes maps query handlers used in ABCIQuery.
routes map[string]GRPCQueryHandler
// handlerByMessageName maps the request name to the handler. It is a hybrid handler which seamlessly
// handles both gogo and protov2 messages.
handlerByMessageName map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error
// binaryCodec is used to encode/decode binary protobuf messages.
binaryCodec codec.BinaryCodec
// cdc is the gRPC codec used by the router to correctly unmarshal messages.
cdc encoding.Codec
// serviceData contains the gRPC services and their handlers.
serviceData []serviceData
}

Expand All @@ -32,7 +45,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
routes: map[string]GRPCQueryHandler{},
handlerByMessageName: map[string][]func(ctx context.Context, req, resp protoiface.MessageV1) error{},
}
}

Expand All @@ -58,46 +72,13 @@ func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler {
func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) {
// adds a top-level query handler based on the gRPC service name
for _, method := range sd.Methods {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
panic(
fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
),
)
err := qrt.registerABCIQueryHandler(sd, method, handler)
if err != nil {
panic(err)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
err = qrt.registerHandlerByMessageName(sd, method, handler)
if err != nil {
panic(err)
}
}

Expand All @@ -107,9 +88,77 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
})
}

func (qrt *GRPCQueryRouter) registerABCIQueryHandler(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName)
methodHandler := method.Handler

// Check that each service is only registered once. If a service is
// registered more than once, then we should error. Since we can't
// return an error (`Server.RegisterService` interface restriction) we
// panic (at startup).
_, found := qrt.routes[fqName]
if found {
return fmt.Errorf(
"gRPC query service %s has already been registered. Please make sure to only register each service once. "+
"This usually means that there are conflicting modules registering the same gRPC query service",
fqName,
)
}

qrt.routes[fqName] = func(ctx sdk.Context, req *abci.RequestQuery) (*abci.ResponseQuery, error) {
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, ctx, func(i interface{}) error {
return qrt.cdc.Unmarshal(req.Data, i)
}, nil)
if err != nil {
return nil, err
}

// proto marshal the result bytes
var resBytes []byte
resBytes, err = qrt.cdc.Marshal(res)
if err != nil {
return nil, err
}

// return the result bytes as the response value
return &abci.ResponseQuery{
Height: req.Height,
Value: resBytes,
}, nil
}
return nil
}

func (qrt *GRPCQueryRouter) HandlersByRequestName(name string) []func(ctx context.Context, req, resp protoiface.MessageV1) error {
return qrt.handlerByMessageName[name]
}

func (qrt *GRPCQueryRouter) registerHandlerByMessageName(sd *grpc.ServiceDesc, method grpc.MethodDesc, handler interface{}) error {
// extract message name from method descriptor
methodFullName := protoreflect.FullName(fmt.Sprintf("%s.%s", sd.ServiceName, method.MethodName))
desc, err := proto.HybridResolver.FindDescriptorByName(methodFullName)
if err != nil {
return fmt.Errorf("cannot find method descriptor %s", methodFullName)
}
methodDesc, ok := desc.(protoreflect.MethodDescriptor)
if !ok {
return fmt.Errorf("invalid method descriptor %s", methodFullName)
}
inputName := methodDesc.Input().FullName()
methodHandler, err := protocompat.MakeHybridHandler(qrt.binaryCodec, sd, method, handler)
if err != nil {
return err
}
qrt.handlerByMessageName[string(inputName)] = append(qrt.handlerByMessageName[string(inputName)], methodHandler)
return nil
}

// SetInterfaceRegistry sets the interface registry for the router. This will
// also register the interface reflection gRPC service.
func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) {
qrt.binaryCodec = codec.NewProtoCodec(interfaceRegistry)
// instantiate the codec
qrt.cdc = codec.NewProtoCodec(interfaceRegistry).GRPCCodec()
// Once we have an interface registry, we can register the interface
Expand Down
44 changes: 44 additions & 0 deletions baseapp/grpcrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,50 @@ func TestGRPCQueryRouter(t *testing.T) {
require.Equal(t, spot, res3.HasAnimal.Animal.GetCachedValue())
}

func TestGRPCRouterHybridHandlers(t *testing.T) {
assertRouterBehaviour := func(helper *baseapp.QueryServiceTestHelper) {
// test getting the handler by name
handlers := helper.GRPCQueryRouter.HandlersByRequestName("testpb.EchoRequest")
require.NotNil(t, handlers)
require.Len(t, handlers, 1)
handler := handlers[0]
// sending a protov2 message should work, and return a protov2 message
v2Resp := new(testdata_pulsar.EchoResponse)
err := handler(helper.Ctx, &testdata_pulsar.EchoRequest{Message: "hello"}, v2Resp)
require.Nil(t, err)
require.Equal(t, "hello", v2Resp.Message)
// also sending a protov1 message should work, and return a gogoproto message
gogoResp := new(testdata.EchoResponse)
err = handler(helper.Ctx, &testdata.EchoRequest{Message: "hello"}, gogoResp)
require.NoError(t, err)
require.Equal(t, "hello", gogoResp.Message)
}

t.Run("protov2 server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata_pulsar.RegisterQueryServer(qr, testdata_pulsar.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})

t.Run("gogoproto server", func(t *testing.T) {
qr := baseapp.NewGRPCQueryRouter()
interfaceRegistry := testdata.NewTestInterfaceRegistry()
qr.SetInterfaceRegistry(interfaceRegistry)
testdata.RegisterQueryServer(qr, testdata.QueryImpl{})
helper := &baseapp.QueryServiceTestHelper{
GRPCQueryRouter: qr,
Ctx: sdk.Context{}.WithContext(context.Background()),
}
assertRouterBehaviour(helper)
})
}

func TestRegisterQueryServiceTwice(t *testing.T) {
// Setup baseapp.
var appBuilder *runtime.AppBuilder
Expand Down
Loading
Loading