Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC server & reflection #6463

Merged
merged 48 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8dd91e3
Add gRPC proxy
aaronc Jun 17, 2020
1e4d01f
Merge branch 'master' into aaronc/5921-grpc-proxy
aaronc Jun 17, 2020
ba6ded6
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jun 18, 2020
20fd64c
Make GRPC disabled by default
aaronc Jun 18, 2020
2ab5f5d
WIP on integration tests
aaronc Jun 18, 2020
70e5042
WIP on integration tests
aaronc Jun 18, 2020
4771a12
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jul 14, 2020
021591f
Start setting up in process tests
aaronc Jul 16, 2020
3176fb4
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jul 16, 2020
f2671c7
Start setting up in process tests
aaronc Jul 16, 2020
7254946
Make it compile
amaury1093 Jul 17, 2020
5834ca6
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
64554c4
Add start server to network util
amaury1093 Jul 20, 2020
928edef
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
b7dc4a8
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
973ff20
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
d0c5291
Add Println
amaury1093 Jul 20, 2020
978a26f
Use go routine
amaury1093 Jul 21, 2020
c93b5e3
Fix scopelint
amaury1093 Jul 21, 2020
71c8139
Move to proxy_test
amaury1093 Jul 21, 2020
42afb10
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 21, 2020
aaa1ac7
Add response type cache
aaronc Jul 23, 2020
b4421f1
Remove proxy
amaury1093 Jul 23, 2020
9072a42
Tweaks
amaury1093 Jul 23, 2020
59d2fa7
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 23, 2020
f15c036
Use channel to handle error
amaury1093 Jul 23, 2020
ff837e8
Use error chan
amaury1093 Jul 23, 2020
4080396
Update server/start.go
amaury1093 Jul 23, 2020
670c44e
Use %w
amaury1093 Jul 23, 2020
c5f850f
Merge branch 'aaronc/5921-grpc-proxy' of ssh://github.com/cosmos/cosm…
amaury1093 Jul 23, 2020
323e1c2
Add sdk.Context
amaury1093 Jul 24, 2020
8825e99
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 24, 2020
9058b6d
Add comments
amaury1093 Jul 24, 2020
0411977
Fix lint
amaury1093 Jul 24, 2020
0c90239
Add header and tests
amaury1093 Jul 24, 2020
8f15a05
Address comments
amaury1093 Jul 24, 2020
f533116
Factorize some code
amaury1093 Jul 24, 2020
72db921
Fix lint
amaury1093 Jul 24, 2020
d93803b
Add height and prove in req metadata
amaury1093 Jul 27, 2020
dbe951d
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 27, 2020
a37c83c
Add reflection test
amaury1093 Jul 27, 2020
8513b9d
Fix lint
amaury1093 Jul 27, 2020
4f659ce
Put grpc test in server/grpc
amaury1093 Jul 27, 2020
ae797c0
Update baseapp/grpcserver.go
alexanderbez Jul 27, 2020
180da4a
Update baseapp/grpcserver.go
alexanderbez Jul 27, 2020
b16cbd4
Remove proof header
amaury1093 Jul 27, 2020
692b032
Merge branch 'master' into aaronc/5921-grpc-proxy
alexanderbez Jul 27, 2020
d6afde0
Merge branch 'master' into aaronc/5921-grpc-proxy
sahith-narahari Jul 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,15 @@ func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter }
// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() grpc.Server { return app.grpcQueryRouter }

// RegisterGRPC registers gRPC services directly with the gRPC server
func (app *BaseApp) RegisterGRPC(grpc.Server) {}

// RegisterGRPCProxy registers gRPC services against a proxy that is expected
// to proxy requests to the ABCI query endpoint
func (app *BaseApp) RegisterGRPCProxy(server grpc.Server) {
app.grpcQueryRouter.RegisterProxyServer(server)
}

// Seal seals a BaseApp. It prohibits any further modifications to a BaseApp.
func (app *BaseApp) Seal() { app.sealed = true }

Expand Down
19 changes: 18 additions & 1 deletion baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ var protoCodec = encoding.GetCodec(proto.Name)

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
routes map[string]GRPCQueryHandler
serviceData []serviceData
}

type serviceData struct {
serviceDesc *grpc.ServiceDesc
handler interface{}
}

var _ gogogrpc.Server
Expand Down Expand Up @@ -73,4 +79,15 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
}, nil
}
}

qrt.serviceData = append(qrt.serviceData, serviceData{
serviceDesc: sd,
handler: handler,
})
}

func (qrt GRPCQueryRouter) RegisterProxyServer(server gogogrpc.Server) {
for _, data := range qrt.serviceData {
server.RegisterService(data.serviceDesc, data.handler)
}
}
17 changes: 17 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,22 @@ type APIConfig struct {
// Ref: https://github.com/cosmos/cosmos-sdk/issues/6420
}

type GRPCConfig struct {
// Enable defines if the gRPC server should be enabled.
Enable bool `mapstructure:"enable"`

// Address defines the API server to listen on
Address string `mapstructure:"address"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`

// Telemetry defines the application telemetry configuration
Telemetry telemetry.Config `mapstructure:"telemetry"`
API APIConfig `mapstructure:"api"`
GRPC GRPCConfig `mapstructure:"grpc"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -129,6 +138,10 @@ func DefaultConfig() *Config {
RPCReadTimeout: 10,
RPCMaxBodyBytes: 1000000,
},
GRPC: GRPCConfig{
aaronc marked this conversation as resolved.
Show resolved Hide resolved
Enable: true,
aaronc marked this conversation as resolved.
Show resolved Hide resolved
Address: "tcp://0.0.0.0:9090",
},
}
}

Expand Down Expand Up @@ -161,5 +174,9 @@ func GetConfig() Config {
RPCMaxBodyBytes: viper.GetUint("api.rpc-max-body-bytes"),
EnableUnsafeCORS: viper.GetBool("api.enabled-unsafe-cors"),
},
GRPC: GRPCConfig{
Enable: viper.GetBool("grpc.enable"),
Address: viper.GetString("grpc.address"),
amaury1093 marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
9 changes: 9 additions & 0 deletions server/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"path/filepath"

"github.com/gogo/protobuf/grpc"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
Expand All @@ -23,6 +25,13 @@ type (
abci.Application

RegisterAPIRoutes(*api.Server)

// RegisterGRPC registers gRPC services directly with the gRPC server
RegisterGRPC(grpc.Server)

// RegisterGRPCProxy registers gRPC services against a proxy that is expected
// to proxy requests to the ABCI query endpoint
RegisterGRPCProxy(grpc.Server)
}

// AppCreator is a function that allows us to lazily initialize an
Expand Down
86 changes: 86 additions & 0 deletions server/grpc/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package grpc

import (
"context"

"github.com/cosmos/cosmos-sdk/client"
gogogrpc "github.com/gogo/protobuf/grpc"
"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/abci/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type proxyServer struct {
server *grpc.Server
interceptor grpc.UnaryServerInterceptor
}

func NewProxyServer(server *grpc.Server, interceptor grpc.UnaryServerInterceptor) gogogrpc.Server {
return &proxyServer{server: server, interceptor: interceptor}
}

func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interface{}) {
newMethods := make([]grpc.MethodDesc, len(desc.Methods))

for i, method := range desc.Methods {
newMethods[i] = grpc.MethodDesc{
MethodName: method.MethodName,
Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) {
return method.Handler(srv, ctx, dec, proxy.interceptor)
},
}
}

newDesc := &grpc.ServiceDesc{
ServiceName: desc.ServiceName,
HandlerType: desc.HandlerType,
Methods: newMethods,
Streams: desc.Streams,
Metadata: desc.Metadata,
}

proxy.server.RegisterService(newDesc, handler)
}

func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterceptor {
return func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) {
msg, ok := req.(proto.Message)
if !ok {
return nil, status.Errorf(codes.Internal, "unable to proto marshal")
}

msgBz, err := proto.Marshal(msg)
if err != nil {
return nil, err
}

abciReq := types.RequestQuery{
Data: msgBz,
Path: info.FullMethod,
}

abciRes, err := clientCtx.QueryABCI(abciReq)

if err != nil {
return nil, err
}

if abciRes.Code != 0 {
return nil, status.Errorf(codes.Internal, abciRes.Log)
}

respMsg, ok := resp.(proto.Message)
if !ok {
return nil, status.Errorf(codes.Internal, "unable to proto marshal")
}

err = proto.Unmarshal(abciRes.Value, respMsg)
if err != nil {
return nil, err
}

return resp, nil
}
}
65 changes: 50 additions & 15 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ package server

import (
"fmt"
"net"
"os"
"runtime/pprof"

"google.golang.org/grpc/reflection"

"google.golang.org/grpc"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/tendermint/tendermint/abci/server"
Expand All @@ -22,6 +27,7 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
grpcproxy "github.com/cosmos/cosmos-sdk/server/grpc"
)

// Tendermint full-node start flags
Expand Down Expand Up @@ -193,27 +199,52 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator
}

config := config.GetConfig()

genDoc, err := genDocProvider()
if err != nil {
return err
}

// TODO: Since this is running in process, do we need to provide a verifier
// and set TrustNode=false? If so, we need to add additional logic that
// waits for a block to be committed first before starting the API server.
clientCtx := client.Context{}.
WithHomeDir(home).
WithChainID(genDoc.ChainID).
WithJSONMarshaler(cdc).
WithClient(local.New(tmNode)).
WithTrustNode(true)

var apiSrv *api.Server
if config.API.Enable {
genDoc, err := genDocProvider()
if err != nil {
apiSrv = api.New(clientCtx)
app.RegisterAPIRoutes(apiSrv)

if err := apiSrv.Start(config); err != nil {
return err
}
}

// TODO: Since this is running in process, do we need to provide a verifier
// and set TrustNode=false? If so, we need to add additional logic that
// waits for a block to be committed first before starting the API server.
ctx := client.Context{}.
WithHomeDir(home).
WithChainID(genDoc.ChainID).
WithJSONMarshaler(cdc).
WithClient(local.New(tmNode)).
WithTrustNode(true)

apiSrv = api.New(ctx)
app.RegisterAPIRoutes(apiSrv)
var grpcSrv *grpc.Server
if config.GRPC.Enable {
grpcSrv = grpc.NewServer()

if err := apiSrv.Start(config); err != nil {
app.RegisterGRPC(grpcSrv)

// proxy queries to the ABCI query endpoint
proxyInterceptor := grpcproxy.ABCIQueryProxyInterceptor(clientCtx)
proxySrv := grpcproxy.NewProxyServer(grpcSrv, proxyInterceptor)
app.RegisterGRPCProxy(proxySrv)

reflection.Register(grpcSrv)

listener, err := net.Listen("tcp", config.GRPC.Address)
if err != nil {
return err
}

err = grpcSrv.Serve(listener)
if err != nil {
return err
}
}
Expand Down Expand Up @@ -251,6 +282,10 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator
_ = apiSrv.Close()
}

if grpcSrv != nil {
_ = grpcSrv.Stop
}

ctx.Logger.Info("exiting...")
})

Expand Down
18 changes: 18 additions & 0 deletions tests/cli/grpc_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"context"
"testing"

"google.golang.org/grpc"

"github.com/stretchr/testify/require"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -26,3 +28,19 @@ func TestCliQueryConn(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "hello", res.Message)
}

func TestGRPCProxy(t *testing.T) {
t.Parallel()
f := NewFixtures(t)

// start simd server
proc := f.SDStart()
t.Cleanup(func() { proc.Stop(false) })

conn, err := grpc.Dial("tcp://0.0.0.0:9090")
require.NoError(t, err)
testClient := testdata.NewTestServiceClient(conn)
res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"})
require.NoError(t, err)
require.Equal(t, "hello", res.Message)
}