From 8dd91e3b4e0c3e03c52cab6816c398328f424c39 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Wed, 17 Jun 2020 17:26:55 -0400 Subject: [PATCH 01/33] Add gRPC proxy --- baseapp/baseapp.go | 9 ++++ baseapp/grpcrouter.go | 19 +++++++- server/config/config.go | 17 +++++++ server/constructors.go | 9 ++++ server/grpc/proxy.go | 86 ++++++++++++++++++++++++++++++++++++ server/start.go | 65 ++++++++++++++++++++------- tests/cli/grpc_query_test.go | 18 ++++++++ 7 files changed, 207 insertions(+), 16 deletions(-) create mode 100644 server/grpc/proxy.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 492dbec6cc82..f4570da7d20e 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -294,6 +294,15 @@ func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() grpc.Server { return app.grpcQueryRouter } +// RegisterGRPC registers gRPC services directly with the gRPC server +func (app *BaseApp) RegisterGRPC(grpc.Server) {} + +// RegisterGRPCProxy registers gRPC services against a proxy that is expected +// to proxy requests to the ABCI query endpoint +func (app *BaseApp) RegisterGRPCProxy(server grpc.Server) { + app.grpcQueryRouter.RegisterProxyServer(server) +} + // Seal seals a BaseApp. It prohibits any further modifications to a BaseApp. func (app *BaseApp) Seal() { app.sealed = true } diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 4c809655d272..36cac5a472d5 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -16,7 +16,13 @@ var protoCodec = encoding.GetCodec(proto.Name) // GRPCQueryRouter routes ABCI Query requests to GRPC handlers type GRPCQueryRouter struct { - routes map[string]GRPCQueryHandler + routes map[string]GRPCQueryHandler + serviceData []serviceData +} + +type serviceData struct { + serviceDesc *grpc.ServiceDesc + handler interface{} } var _ gogogrpc.Server @@ -73,4 +79,15 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf }, nil } } + + qrt.serviceData = append(qrt.serviceData, serviceData{ + serviceDesc: sd, + handler: handler, + }) +} + +func (qrt GRPCQueryRouter) RegisterProxyServer(server gogogrpc.Server) { + for _, data := range qrt.serviceData { + server.RegisterService(data.serviceDesc, data.handler) + } } diff --git a/server/config/config.go b/server/config/config.go index 6ae3737dee6d..73a5695e79ce 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -74,6 +74,14 @@ type APIConfig struct { // Ref: https://github.com/cosmos/cosmos-sdk/issues/6420 } +type GRPCConfig struct { + // Enable defines if the gRPC server should be enabled. + Enable bool `mapstructure:"enable"` + + // Address defines the API server to listen on + Address string `mapstructure:"address"` +} + // Config defines the server's top level configuration type Config struct { BaseConfig `mapstructure:",squash"` @@ -81,6 +89,7 @@ type Config struct { // Telemetry defines the application telemetry configuration Telemetry telemetry.Config `mapstructure:"telemetry"` API APIConfig `mapstructure:"api"` + GRPC GRPCConfig `mapstructure:"grpc"` } // SetMinGasPrices sets the validator's minimum gas prices. @@ -129,6 +138,10 @@ func DefaultConfig() *Config { RPCReadTimeout: 10, RPCMaxBodyBytes: 1000000, }, + GRPC: GRPCConfig{ + Enable: true, + Address: "tcp://0.0.0.0:9090", + }, } } @@ -161,5 +174,9 @@ func GetConfig() Config { RPCMaxBodyBytes: viper.GetUint("api.rpc-max-body-bytes"), EnableUnsafeCORS: viper.GetBool("api.enabled-unsafe-cors"), }, + GRPC: GRPCConfig{ + Enable: viper.GetBool("grpc.enable"), + Address: viper.GetString("grpc.address"), + }, } } diff --git a/server/constructors.go b/server/constructors.go index 1b628194332d..f00332517227 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -6,6 +6,8 @@ import ( "os" "path/filepath" + "github.com/gogo/protobuf/grpc" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" @@ -23,6 +25,13 @@ type ( abci.Application RegisterAPIRoutes(*api.Server) + + // RegisterGRPC registers gRPC services directly with the gRPC server + RegisterGRPC(grpc.Server) + + // RegisterGRPCProxy registers gRPC services against a proxy that is expected + // to proxy requests to the ABCI query endpoint + RegisterGRPCProxy(grpc.Server) } // AppCreator is a function that allows us to lazily initialize an diff --git a/server/grpc/proxy.go b/server/grpc/proxy.go new file mode 100644 index 000000000000..d4bc98c63250 --- /dev/null +++ b/server/grpc/proxy.go @@ -0,0 +1,86 @@ +package grpc + +import ( + "context" + + "github.com/cosmos/cosmos-sdk/client" + gogogrpc "github.com/gogo/protobuf/grpc" + "github.com/gogo/protobuf/proto" + "github.com/tendermint/tendermint/abci/types" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type proxyServer struct { + server *grpc.Server + interceptor grpc.UnaryServerInterceptor +} + +func NewProxyServer(server *grpc.Server, interceptor grpc.UnaryServerInterceptor) gogogrpc.Server { + return &proxyServer{server: server, interceptor: interceptor} +} + +func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interface{}) { + newMethods := make([]grpc.MethodDesc, len(desc.Methods)) + + for i, method := range desc.Methods { + newMethods[i] = grpc.MethodDesc{ + MethodName: method.MethodName, + Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { + return method.Handler(srv, ctx, dec, proxy.interceptor) + }, + } + } + + newDesc := &grpc.ServiceDesc{ + ServiceName: desc.ServiceName, + HandlerType: desc.HandlerType, + Methods: newMethods, + Streams: desc.Streams, + Metadata: desc.Metadata, + } + + proxy.server.RegisterService(newDesc, handler) +} + +func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterceptor { + return func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) { + msg, ok := req.(proto.Message) + if !ok { + return nil, status.Errorf(codes.Internal, "unable to proto marshal") + } + + msgBz, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + + abciReq := types.RequestQuery{ + Data: msgBz, + Path: info.FullMethod, + } + + abciRes, err := clientCtx.QueryABCI(abciReq) + + if err != nil { + return nil, err + } + + if abciRes.Code != 0 { + return nil, status.Errorf(codes.Internal, abciRes.Log) + } + + respMsg, ok := resp.(proto.Message) + if !ok { + return nil, status.Errorf(codes.Internal, "unable to proto marshal") + } + + err = proto.Unmarshal(abciRes.Value, respMsg) + if err != nil { + return nil, err + } + + return resp, nil + } +} diff --git a/server/start.go b/server/start.go index d4eb9f76ded3..89af0a2b1658 100644 --- a/server/start.go +++ b/server/start.go @@ -4,9 +4,14 @@ package server import ( "fmt" + "net" "os" "runtime/pprof" + "google.golang.org/grpc/reflection" + + "google.golang.org/grpc" + "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/tendermint/tendermint/abci/server" @@ -22,6 +27,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" + grpcproxy "github.com/cosmos/cosmos-sdk/server/grpc" ) // Tendermint full-node start flags @@ -193,27 +199,52 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator } config := config.GetConfig() + + genDoc, err := genDocProvider() + if err != nil { + return err + } + + // TODO: Since this is running in process, do we need to provide a verifier + // and set TrustNode=false? If so, we need to add additional logic that + // waits for a block to be committed first before starting the API server. + clientCtx := client.Context{}. + WithHomeDir(home). + WithChainID(genDoc.ChainID). + WithJSONMarshaler(cdc). + WithClient(local.New(tmNode)). + WithTrustNode(true) + var apiSrv *api.Server if config.API.Enable { - genDoc, err := genDocProvider() - if err != nil { + apiSrv = api.New(clientCtx) + app.RegisterAPIRoutes(apiSrv) + + if err := apiSrv.Start(config); err != nil { return err } + } - // TODO: Since this is running in process, do we need to provide a verifier - // and set TrustNode=false? If so, we need to add additional logic that - // waits for a block to be committed first before starting the API server. - ctx := client.Context{}. - WithHomeDir(home). - WithChainID(genDoc.ChainID). - WithJSONMarshaler(cdc). - WithClient(local.New(tmNode)). - WithTrustNode(true) - - apiSrv = api.New(ctx) - app.RegisterAPIRoutes(apiSrv) + var grpcSrv *grpc.Server + if config.GRPC.Enable { + grpcSrv = grpc.NewServer() - if err := apiSrv.Start(config); err != nil { + app.RegisterGRPC(grpcSrv) + + // proxy queries to the ABCI query endpoint + proxyInterceptor := grpcproxy.ABCIQueryProxyInterceptor(clientCtx) + proxySrv := grpcproxy.NewProxyServer(grpcSrv, proxyInterceptor) + app.RegisterGRPCProxy(proxySrv) + + reflection.Register(grpcSrv) + + listener, err := net.Listen("tcp", config.GRPC.Address) + if err != nil { + return err + } + + err = grpcSrv.Serve(listener) + if err != nil { return err } } @@ -251,6 +282,10 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator _ = apiSrv.Close() } + if grpcSrv != nil { + _ = grpcSrv.Stop + } + ctx.Logger.Info("exiting...") }) diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index a6286d6781fd..35492df016cd 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -6,6 +6,8 @@ import ( "context" "testing" + "google.golang.org/grpc" + "github.com/stretchr/testify/require" "github.com/cosmos/cosmos-sdk/client" @@ -26,3 +28,19 @@ func TestCliQueryConn(t *testing.T) { require.NoError(t, err) require.Equal(t, "hello", res.Message) } + +func TestGRPCProxy(t *testing.T) { + t.Parallel() + f := NewFixtures(t) + + // start simd server + proc := f.SDStart() + t.Cleanup(func() { proc.Stop(false) }) + + conn, err := grpc.Dial("tcp://0.0.0.0:9090") + require.NoError(t, err) + testClient := testdata.NewTestServiceClient(conn) + res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) + require.NoError(t, err) + require.Equal(t, "hello", res.Message) +} From 20fd64c27ed03eee894e2a71f03e319b5e5581e8 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 18 Jun 2020 10:02:59 -0400 Subject: [PATCH 02/33] Make GRPC disabled by default --- server/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/config/config.go b/server/config/config.go index 73a5695e79ce..9baa270e28dd 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -139,7 +139,7 @@ func DefaultConfig() *Config { RPCMaxBodyBytes: 1000000, }, GRPC: GRPCConfig{ - Enable: true, + Enable: false, Address: "tcp://0.0.0.0:9090", }, } From 2ab5f5d2a70cb27bd1a536ad14f13f2bc94c9b70 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 18 Jun 2020 11:00:27 -0400 Subject: [PATCH 03/33] WIP on integration tests --- tests/cli/grpc_query_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index 35492df016cd..3cec9ae4f977 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -1,11 +1,13 @@ // +build cli_test -package cli +package cli_test import ( "context" "testing" + "github.com/cosmos/cosmos-sdk/tests/cli" + "google.golang.org/grpc" "github.com/stretchr/testify/require" @@ -16,7 +18,7 @@ import ( func TestCliQueryConn(t *testing.T) { t.Parallel() - f := NewFixtures(t) + f := cli.NewFixtures(t) // start simd server proc := f.SDStart() @@ -31,7 +33,7 @@ func TestCliQueryConn(t *testing.T) { func TestGRPCProxy(t *testing.T) { t.Parallel() - f := NewFixtures(t) + f := cli.NewFixtures(t) // start simd server proc := f.SDStart() From 70e504227660f21d633f588d6d7a9808db524248 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 18 Jun 2020 12:09:45 -0400 Subject: [PATCH 04/33] WIP on integration tests --- tests/cli/fixtures.go | 5 +++++ tests/cli/helpers.go | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/cli/fixtures.go b/tests/cli/fixtures.go index 564514ddc114..66cb56a673ff 100644 --- a/tests/cli/fixtures.go +++ b/tests/cli/fixtures.go @@ -32,6 +32,7 @@ type Fixtures struct { SimdHome string SimcliHome string P2PAddr string + GRPCAddr string Cdc *codec.Codec T *testing.T } @@ -47,6 +48,9 @@ func NewFixtures(t *testing.T) *Fixtures { p2pAddr, _, err := server.FreeTCPAddr() require.NoError(t, err) + grpcAddr, _, err := server.FreeTCPAddr() + require.NoError(t, err) + buildDir := os.Getenv("BUILDDIR") if buildDir == "" { t.Skip("builddir is empty, skipping") @@ -62,6 +66,7 @@ func NewFixtures(t *testing.T) *Fixtures { SimcliHome: filepath.Join(tmpDir, ".simcli"), RPCAddr: servAddr, P2PAddr: p2pAddr, + GRPCAddr: grpcAddr, Cdc: cdc, Port: port, } diff --git a/tests/cli/helpers.go b/tests/cli/helpers.go index 72e665b0b818..0592c3626e1f 100644 --- a/tests/cli/helpers.go +++ b/tests/cli/helpers.go @@ -86,7 +86,8 @@ func (f *Fixtures) CollectGenTxs(flags ...string) { // SDStart runs simd start with the appropriate flags and returns a process func (f *Fixtures) SDStart(flags ...string) *tests.Process { - cmd := fmt.Sprintf("%s start --home=%s --rpc.laddr=%v --p2p.laddr=%v", f.SimdBinary, f.SimdHome, f.RPCAddr, f.P2PAddr) + cmd := fmt.Sprintf("%s start --home=%s --rpc.laddr=%v --p2p.laddr=%v --grpc.enable --grpc.address=%s", + f.SimdBinary, f.SimdHome, f.RPCAddr, f.P2PAddr, f.GRPCAddr) proc := tests.GoExecuteTWithStdout(f.T, AddFlags(cmd, flags)) tests.WaitForTMStart(f.Port) tests.WaitForNextNBlocksTM(1, f.Port) From 021591fad456a56036361031a3bdacbcffb0cd9b Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 16 Jul 2020 11:58:17 -0400 Subject: [PATCH 05/33] Start setting up in process tests --- tests/cli/fixtures.go | 5 ----- tests/cli/grpc_query_test.go | 40 +++++++++++++++++++++++++++--------- testutil/network/network.go | 6 ++++++ 3 files changed, 36 insertions(+), 15 deletions(-) diff --git a/tests/cli/fixtures.go b/tests/cli/fixtures.go index 63ce54792f66..0d2b04938537 100644 --- a/tests/cli/fixtures.go +++ b/tests/cli/fixtures.go @@ -27,7 +27,6 @@ type Fixtures struct { SimdHome string SimcliHome string P2PAddr string - GRPCAddr string Cdc *codec.Codec EncodingConfig params.EncodingConfig T *testing.T @@ -44,9 +43,6 @@ func NewFixtures(t *testing.T) *Fixtures { p2pAddr, _, err := server.FreeTCPAddr() require.NoError(t, err) - grpcAddr, _, err := server.FreeTCPAddr() - require.NoError(t, err) - buildDir := os.Getenv("BUILDDIR") if buildDir == "" { t.Skip("builddir is empty, skipping") @@ -63,7 +59,6 @@ func NewFixtures(t *testing.T) *Fixtures { SimcliHome: filepath.Join(tmpDir, ".simcli"), RPCAddr: servAddr, P2PAddr: p2pAddr, - GRPCAddr: grpcAddr, Cdc: encodingConfig.Amino, EncodingConfig: encodingConfig, Port: port, diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index 3cec9ae4f977..924738fe2125 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -6,6 +6,10 @@ import ( "context" "testing" + "github.com/stretchr/testify/suite" + + "github.com/cosmos/cosmos-sdk/testutil/network" + "github.com/cosmos/cosmos-sdk/tests/cli" "google.golang.org/grpc" @@ -17,6 +21,7 @@ import ( ) func TestCliQueryConn(t *testing.T) { + // TODO use in process tests for this t.Parallel() f := cli.NewFixtures(t) @@ -31,18 +36,33 @@ func TestCliQueryConn(t *testing.T) { require.Equal(t, "hello", res.Message) } -func TestGRPCProxy(t *testing.T) { - t.Parallel() - f := cli.NewFixtures(t) +type IntegrationTestSuite struct { + suite.Suite - // start simd server - proc := f.SDStart() - t.Cleanup(func() { proc.Stop(false) }) + cfg network.Config + network *network.Network +} - conn, err := grpc.Dial("tcp://0.0.0.0:9090") - require.NoError(t, err) +func (s *IntegrationTestSuite) SetupSuite() { + s.T().Log("setting up integration test suite") + + s.network = network.New(s.T(), network.DefaultConfig()) + s.Require().NotNil(s.network) + + _, err := s.network.WaitForHeight(1) + s.Require().NoError(err) +} + +func (s *IntegrationTestSuite) TestGRPC() { + val := s.network.Validators[0] + conn, err := grpc.Dial(val.AppConfig.GRPC.Address) + require.NoError(s.T(), err) testClient := testdata.NewTestServiceClient(conn) res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) - require.NoError(t, err) - require.Equal(t, "hello", res.Message) + require.NoError(s.T(), err) + require.Equal(s.T(), "hello", res.Message) +} + +func TestIntegrationTestSuite(t *testing.T) { + suite.Run(t, &IntegrationTestSuite{}) } diff --git a/testutil/network/network.go b/testutil/network/network.go index d682e9c742b9..ef693b28ddad 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -75,6 +75,7 @@ type Config struct { PruningStrategy string // the pruning strategy each validator will have EnableLogging bool // enable Tendermint logging to STDOUT CleanupDir bool // remove base temporary directory during cleanup + GRPCEndpoint string } // DefaultConfig returns a sane default configuration suitable for nearly all @@ -203,6 +204,11 @@ func New(t *testing.T, cfg Config) *Network { rpcAddr, _, err := server.FreeTCPAddr() require.NoError(t, err) tmCfg.RPC.ListenAddress = rpcAddr + + grpcAddr, _, err := server.FreeTCPAddr() + require.NoError(t, err) + appCfg.GRPC.Address = grpcAddr + appCfg.GRPC.Enable = true } logger := log.NewNopLogger() From f2671c7b2e0364dcdfdd1c2d3d9470a208d10376 Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 16 Jul 2020 12:00:09 -0400 Subject: [PATCH 06/33] Start setting up in process tests --- tests/cli/helpers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cli/helpers.go b/tests/cli/helpers.go index 378a2a6266de..21ede81c673b 100644 --- a/tests/cli/helpers.go +++ b/tests/cli/helpers.go @@ -86,8 +86,8 @@ func (f *Fixtures) CollectGenTxs(flags ...string) { // SDStart runs simd start with the appropriate flags and returns a process func (f *Fixtures) SDStart(flags ...string) *tests.Process { - cmd := fmt.Sprintf("%s start --home=%s --rpc.laddr=%v --p2p.laddr=%v --grpc.enable --grpc.address=%s", - f.SimdBinary, f.SimdHome, f.RPCAddr, f.P2PAddr, f.GRPCAddr) + cmd := fmt.Sprintf("%s start --home=%s --rpc.laddr=%v --p2p.laddr=%v --grpc.enable", + f.SimdBinary, f.SimdHome, f.RPCAddr, f.P2PAddr) proc := tests.GoExecuteTWithStdout(f.T, AddFlags(cmd, flags)) tests.WaitForTMStart(f.Port) tests.WaitForNextNBlocksTM(1, f.Port) From 7254946182a927517e9af4149eda29c8c549d00f Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 17 Jul 2020 17:22:33 +0200 Subject: [PATCH 07/33] Make it compile --- baseapp/baseapp.go | 18 +++++++++--------- baseapp/grpcrouter.go | 1 + server/config/config.go | 1 + server/constructors.go | 4 ++-- server/start.go | 16 +++++++++++++--- tests/cli/grpc_query_test.go | 16 +++++++++------- testutil/network/network.go | 5 ++--- 7 files changed, 37 insertions(+), 24 deletions(-) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 36130106d652..8b837f16f96f 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -5,8 +5,8 @@ import ( "reflect" "strings" + gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" - "google.golang.org/grpc" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" @@ -173,8 +173,8 @@ func (app *BaseApp) MountStores(keys ...sdk.StoreKey) { } } -// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp -// multistore. +// MountKVStores mounts all IAVL or DB stores to the provided keys in the +// BaseApp multistore. func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) { for _, key := range keys { if !app.fauxMerkleMode { @@ -187,8 +187,8 @@ func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) { } } -// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp -// multistore. +// MountTransientStores mounts all IAVL or DB stores to the provided keys in +// the BaseApp multistore. func (app *BaseApp) MountTransientStores(keys map[string]*sdk.TransientStoreKey) { for _, key := range keys { app.MountStore(key, sdk.StoreTypeTransient) @@ -301,12 +301,12 @@ func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } -// RegisterGRPC registers gRPC services directly with the gRPC server -func (app *BaseApp) RegisterGRPC(grpc.Server) {} +// RegisterGRPC registers gRPC services directly with the gRPC server. +func (app *BaseApp) RegisterGRPC(gogogrpc.Server) {} // RegisterGRPCProxy registers gRPC services against a proxy that is expected -// to proxy requests to the ABCI query endpoint -func (app *BaseApp) RegisterGRPCProxy(server grpc.Server) { +// to proxy requests to the ABCI query endpoint. +func (app *BaseApp) RegisterGRPCProxy(server gogogrpc.Server) { app.grpcQueryRouter.RegisterProxyServer(server) } diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 91820907fb53..53ec90f66882 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -96,6 +96,7 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf }) } +// RegisterProxyServer registers a gRPC proxy server. func (qrt GRPCQueryRouter) RegisterProxyServer(server gogogrpc.Server) { for _, data := range qrt.serviceData { server.RegisterService(data.serviceDesc, data.handler) diff --git a/server/config/config.go b/server/config/config.go index 419648e50587..77769b2c7a99 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -75,6 +75,7 @@ type APIConfig struct { // Ref: https://github.com/cosmos/cosmos-sdk/issues/6420 } +// GRPCConfig defines configuration for the gRPC server. type GRPCConfig struct { // Enable defines if the gRPC server should be enabled. Enable bool `mapstructure:"enable"` diff --git a/server/constructors.go b/server/constructors.go index 104b865bd39e..72e362bc2248 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -37,11 +37,11 @@ type ( RegisterAPIRoutes(*api.Server) - // RegisterGRPC registers gRPC services directly with the gRPC server + // RegisterGRPC registers gRPC services directly with the gRPC server. RegisterGRPC(grpc.Server) // RegisterGRPCProxy registers gRPC services against a proxy that is expected - // to proxy requests to the ABCI query endpoint + // to proxy requests to the ABCI query endpoint. RegisterGRPCProxy(grpc.Server) } diff --git a/server/start.go b/server/start.go index e77f29bfa8d0..da0345be450a 100644 --- a/server/start.go +++ b/server/start.go @@ -9,9 +9,8 @@ import ( "runtime/pprof" "time" - "google.golang.org/grpc/reflection" - "google.golang.org/grpc" + "google.golang.org/grpc/reflection" "github.com/spf13/cobra" "github.com/tendermint/tendermint/abci/server" @@ -245,8 +244,19 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator var grpcSrv *grpc.Server if config.GRPC.Enable { - grpcSrv = grpc.NewServer() + genDoc, err := genDocProvider() + if err != nil { + return err + } + clientCtx := client.Context{}. + WithHomeDir(home). + WithChainID(genDoc.ChainID). + WithJSONMarshaler(cdc). + WithClient(local.New(tmNode)). + WithTrustNode(true) + + grpcSrv = grpc.NewServer() app.RegisterGRPC(grpcSrv) // proxy queries to the ABCI query endpoint diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index 924738fe2125..e29b83d858f1 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -1,5 +1,3 @@ -// +build cli_test - package cli_test import ( @@ -55,14 +53,18 @@ func (s *IntegrationTestSuite) SetupSuite() { func (s *IntegrationTestSuite) TestGRPC() { val := s.network.Validators[0] - conn, err := grpc.Dial(val.AppConfig.GRPC.Address) - require.NoError(s.T(), err) + conn, err := grpc.Dial( + val.AppConfig.GRPC.Address, + grpc.WithInsecure(), // Or else we get "no transport security set" + ) + s.Require().NoError(err) testClient := testdata.NewTestServiceClient(conn) res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) - require.NoError(s.T(), err) - require.Equal(s.T(), "hello", res.Message) + s.Require().NoError(err) + s.Require().Equal("a", "b") + s.Require().Equal("hello", res.Message) } -func TestIntegrationTestSuite(t *testing.T) { +func TestCLIIntegrationTestSuite(t *testing.T) { suite.Run(t, &IntegrationTestSuite{}) } diff --git a/testutil/network/network.go b/testutil/network/network.go index ef693b28ddad..0c37bb0e9ef3 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -75,7 +75,6 @@ type Config struct { PruningStrategy string // the pruning strategy each validator will have EnableLogging bool // enable Tendermint logging to STDOUT CleanupDir bool // remove base temporary directory during cleanup - GRPCEndpoint string } // DefaultConfig returns a sane default configuration suitable for nearly all @@ -205,9 +204,9 @@ func New(t *testing.T, cfg Config) *Network { require.NoError(t, err) tmCfg.RPC.ListenAddress = rpcAddr - grpcAddr, _, err := server.FreeTCPAddr() + _, grpcPort, err := server.FreeTCPAddr() require.NoError(t, err) - appCfg.GRPC.Address = grpcAddr + appCfg.GRPC.Address = fmt.Sprintf(":%s", grpcPort) appCfg.GRPC.Enable = true } From 64554c485b477b74e7fa0beab4bee1e505caddf8 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 20 Jul 2020 13:50:35 +0200 Subject: [PATCH 08/33] Add start server to network util --- server/config/config.go | 2 +- server/start.go | 2 +- tests/cli/grpc_query_test.go | 12 +++++------- testutil/network/network.go | 2 +- testutil/network/util.go | 28 ++++++++++++++++++++++++++++ 5 files changed, 36 insertions(+), 10 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index 77769b2c7a99..fb67b989700e 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -146,7 +146,7 @@ func DefaultConfig() *Config { }, GRPC: GRPCConfig{ Enable: false, - Address: "tcp://0.0.0.0:9090", + Address: "0.0.0.0:9090", }, } } diff --git a/server/start.go b/server/start.go index da0345be450a..cb94c3a047f3 100644 --- a/server/start.go +++ b/server/start.go @@ -311,7 +311,7 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator } if grpcSrv != nil { - _ = grpcSrv.Stop + grpcSrv.Stop() } ctx.Logger.Info("exiting...") diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index e29b83d858f1..33e4011f7138 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -1,21 +1,19 @@ +// +build cli_test + package cli_test import ( "context" "testing" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - - "github.com/cosmos/cosmos-sdk/testutil/network" - - "github.com/cosmos/cosmos-sdk/tests/cli" - "google.golang.org/grpc" - "github.com/stretchr/testify/require" - "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec/testdata" + "github.com/cosmos/cosmos-sdk/tests/cli" + "github.com/cosmos/cosmos-sdk/testutil/network" ) func TestCliQueryConn(t *testing.T) { diff --git a/testutil/network/network.go b/testutil/network/network.go index 0c37bb0e9ef3..5cb8d3141f7c 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -206,7 +206,7 @@ func New(t *testing.T, cfg Config) *Network { _, grpcPort, err := server.FreeTCPAddr() require.NoError(t, err) - appCfg.GRPC.Address = fmt.Sprintf(":%s", grpcPort) + appCfg.GRPC.Address = fmt.Sprintf("0.0.0.0:%s", grpcPort) appCfg.GRPC.Enable = true } diff --git a/testutil/network/util.go b/testutil/network/util.go index 9a03a94fde4e..42195dc749f9 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,6 +1,7 @@ package network import ( + "net" "path/filepath" "time" @@ -12,9 +13,12 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" + grpcproxy "github.com/cosmos/cosmos-sdk/server/grpc" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/cosmos/cosmos-sdk/x/genutil" @@ -83,6 +87,30 @@ func startInProcess(cfg Config, val *Validator) error { val.api = apiSrv } + // TODO Factor code with server/start.go + if val.AppConfig.GRPC.Enable { + grpcSrv := grpc.NewServer() + app.RegisterGRPC(grpcSrv) + + // proxy queries to the ABCI query endpoint + proxyInterceptor := grpcproxy.ABCIQueryProxyInterceptor(val.ClientCtx) + proxySrv := grpcproxy.NewProxyServer(grpcSrv, proxyInterceptor) + app.RegisterGRPCProxy(proxySrv) + + reflection.Register(grpcSrv) + + listener, err := net.Listen("tcp", val.AppConfig.GRPC.Address) + + if err != nil { + return err + } + + err = grpcSrv.Serve(listener) + if err != nil { + return err + } + } + return nil } From d0c52918687ece9cb4b4e94a19fcf45da19ea44f Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 20 Jul 2020 17:31:54 +0200 Subject: [PATCH 09/33] Add Println --- tests/cli/grpc_query_test.go | 3 +-- testutil/network/util.go | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/cli/grpc_query_test.go b/tests/cli/grpc_query_test.go index 33e4011f7138..749dbc5b2bdd 100644 --- a/tests/cli/grpc_query_test.go +++ b/tests/cli/grpc_query_test.go @@ -11,9 +11,9 @@ import ( "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/codec/testdata" "github.com/cosmos/cosmos-sdk/tests/cli" "github.com/cosmos/cosmos-sdk/testutil/network" + "github.com/cosmos/cosmos-sdk/testutil/testdata" ) func TestCliQueryConn(t *testing.T) { @@ -59,7 +59,6 @@ func (s *IntegrationTestSuite) TestGRPC() { testClient := testdata.NewTestServiceClient(conn) res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) s.Require().NoError(err) - s.Require().Equal("a", "b") s.Require().Equal("hello", res.Message) } diff --git a/testutil/network/util.go b/testutil/network/util.go index 42195dc749f9..9b8c09b87b8c 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,6 +1,7 @@ package network import ( + "fmt" "net" "path/filepath" "time" @@ -105,7 +106,9 @@ func startInProcess(cfg Config, val *Validator) error { return err } + fmt.Println("GRPC server hangs here...") err = grpcSrv.Serve(listener) + fmt.Println("never called") if err != nil { return err } From 978a26f17774c221811ec4978327c34b229ed594 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Tue, 21 Jul 2020 16:02:44 +0200 Subject: [PATCH 10/33] Use go routine --- testutil/network/util.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/testutil/network/util.go b/testutil/network/util.go index 9b8c09b87b8c..7d30b5600bd5 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -106,12 +106,11 @@ func startInProcess(cfg Config, val *Validator) error { return err } - fmt.Println("GRPC server hangs here...") - err = grpcSrv.Serve(listener) - fmt.Println("never called") - if err != nil { - return err - } + go func() { + if err := grpcSrv.Serve(listener); err != nil { + fmt.Printf("failed to serve: %v\n", err) + } + }() } return nil From c93b5e37b2ab139deecf31cfced4424541200175 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Tue, 21 Jul 2020 16:26:27 +0200 Subject: [PATCH 11/33] Fix scopelint --- server/grpc/proxy.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/grpc/proxy.go b/server/grpc/proxy.go index d4bc98c63250..fd7d65a07a21 100644 --- a/server/grpc/proxy.go +++ b/server/grpc/proxy.go @@ -17,6 +17,7 @@ type proxyServer struct { interceptor grpc.UnaryServerInterceptor } +// NewProxyServer creates a new proxy server with an interceptor. func NewProxyServer(server *grpc.Server, interceptor grpc.UnaryServerInterceptor) gogogrpc.Server { return &proxyServer{server: server, interceptor: interceptor} } @@ -25,6 +26,7 @@ func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interf newMethods := make([]grpc.MethodDesc, len(desc.Methods)) for i, method := range desc.Methods { + method := method // Fix scopelint: Using the variable on range scope `method` in function literal newMethods[i] = grpc.MethodDesc{ MethodName: method.MethodName, Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { @@ -44,6 +46,8 @@ func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interf proxy.server.RegisterService(newDesc, handler) } +// ABCIQueryProxyInterceptor implements a query interceptor that intercepts +// queries and sends them to the ABCI. func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterceptor { return func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) { msg, ok := req.(proto.Message) @@ -62,7 +66,6 @@ func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterce } abciRes, err := clientCtx.QueryABCI(abciReq) - if err != nil { return nil, err } From 71c813964409109a8d04892ecc7add30122eb1f6 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Tue, 21 Jul 2020 16:30:29 +0200 Subject: [PATCH 12/33] Move to proxy_test --- server/grpc/proxy_test.go | 46 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 server/grpc/proxy_test.go diff --git a/server/grpc/proxy_test.go b/server/grpc/proxy_test.go new file mode 100644 index 000000000000..56612c49b0c9 --- /dev/null +++ b/server/grpc/proxy_test.go @@ -0,0 +1,46 @@ +package grpc_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + + "github.com/cosmos/cosmos-sdk/testutil/network" + "github.com/cosmos/cosmos-sdk/testutil/testdata" +) + +type IntegrationTestSuite struct { + suite.Suite + + cfg network.Config + network *network.Network +} + +func (s *IntegrationTestSuite) SetupSuite() { + s.T().Log("setting up integration test suite") + + s.network = network.New(s.T(), network.DefaultConfig()) + s.Require().NotNil(s.network) + + _, err := s.network.WaitForHeight(1) + s.Require().NoError(err) +} + +func (s *IntegrationTestSuite) TestGRPC() { + val := s.network.Validators[0] + conn, err := grpc.Dial( + val.AppConfig.GRPC.Address, + grpc.WithInsecure(), // Or else we get "no transport security set" + ) + s.Require().NoError(err) + testClient := testdata.NewTestServiceClient(conn) + res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) + s.Require().NoError(err) + s.Require().Equal("hello", res.Message) +} + +func TestCLIIntegrationTestSuite(t *testing.T) { + suite.Run(t, &IntegrationTestSuite{}) +} From aaa1ac7baa81219339e3cf4ad9e90c8d45fbdf8f Mon Sep 17 00:00:00 2001 From: Aaron Craelius Date: Thu, 23 Jul 2020 12:02:02 -0400 Subject: [PATCH 13/33] Add response type cache --- server/grpc/proxy.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/server/grpc/proxy.go b/server/grpc/proxy.go index fd7d65a07a21..10d1dba36d97 100644 --- a/server/grpc/proxy.go +++ b/server/grpc/proxy.go @@ -2,14 +2,16 @@ package grpc import ( "context" + "reflect" - "github.com/cosmos/cosmos-sdk/client" gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" "github.com/tendermint/tendermint/abci/types" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/cosmos/cosmos-sdk/client" ) type proxyServer struct { @@ -49,6 +51,8 @@ func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interf // ABCIQueryProxyInterceptor implements a query interceptor that intercepts // queries and sends them to the ABCI. func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterceptor { + resTypeCache := map[string]reflect.Type{} + return func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) { msg, ok := req.(proto.Message) if !ok { @@ -65,6 +69,12 @@ func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterce Path: info.FullMethod, } + resType, ok := resTypeCache[info.FullMethod] + if !ok { + // TODO figure out response type and cache + // NOTE: this is a bit hacky but... + } + abciRes, err := clientCtx.QueryABCI(abciReq) if err != nil { return nil, err @@ -74,7 +84,7 @@ func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterce return nil, status.Errorf(codes.Internal, abciRes.Log) } - respMsg, ok := resp.(proto.Message) + respMsg, ok := reflect.New(resType).Interface().(proto.Message) if !ok { return nil, status.Errorf(codes.Internal, "unable to proto marshal") } From b4421f1b972204d4727f96cc5c55bc70f0db5b55 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 19:26:30 +0200 Subject: [PATCH 14/33] Remove proxy --- baseapp/baseapp.go | 10 ++-- baseapp/grpcrouter.go | 1 + server/constructors.go | 4 -- server/grpc/proxy.go | 99 -------------------------------- server/grpc/proxy_test.go | 46 --------------- server/start.go | 20 +------ testutil/network/network_test.go | 16 ++++++ testutil/network/util.go | 9 --- 8 files changed, 23 insertions(+), 182 deletions(-) delete mode 100644 server/grpc/proxy.go delete mode 100644 server/grpc/proxy_test.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index b368dc6cc31f..3723fc47f644 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -302,12 +302,10 @@ func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } // RegisterGRPC registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPC(gogogrpc.Server) {} - -// RegisterGRPCProxy registers gRPC services against a proxy that is expected -// to proxy requests to the ABCI query endpoint. -func (app *BaseApp) RegisterGRPCProxy(server gogogrpc.Server) { - app.grpcQueryRouter.RegisterProxyServer(server) +func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { + for _, data := range app.GRPCQueryRouter().serviceData { + server.RegisterService(data.serviceDesc, data.handler) + } } // Seal seals a BaseApp. It prohibits any further modifications to a BaseApp. diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 53ec90f66882..7ff1b8413433 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -23,6 +23,7 @@ type GRPCQueryRouter struct { serviceData []serviceData } +// serviceData represents a gRPC service, along with its handler. type serviceData struct { serviceDesc *grpc.ServiceDesc handler interface{} diff --git a/server/constructors.go b/server/constructors.go index 72e362bc2248..61392392b87e 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -39,10 +39,6 @@ type ( // RegisterGRPC registers gRPC services directly with the gRPC server. RegisterGRPC(grpc.Server) - - // RegisterGRPCProxy registers gRPC services against a proxy that is expected - // to proxy requests to the ABCI query endpoint. - RegisterGRPCProxy(grpc.Server) } // AppCreator is a function that allows us to lazily initialize an diff --git a/server/grpc/proxy.go b/server/grpc/proxy.go deleted file mode 100644 index 10d1dba36d97..000000000000 --- a/server/grpc/proxy.go +++ /dev/null @@ -1,99 +0,0 @@ -package grpc - -import ( - "context" - "reflect" - - gogogrpc "github.com/gogo/protobuf/grpc" - "github.com/gogo/protobuf/proto" - "github.com/tendermint/tendermint/abci/types" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - - "github.com/cosmos/cosmos-sdk/client" -) - -type proxyServer struct { - server *grpc.Server - interceptor grpc.UnaryServerInterceptor -} - -// NewProxyServer creates a new proxy server with an interceptor. -func NewProxyServer(server *grpc.Server, interceptor grpc.UnaryServerInterceptor) gogogrpc.Server { - return &proxyServer{server: server, interceptor: interceptor} -} - -func (proxy *proxyServer) RegisterService(desc *grpc.ServiceDesc, handler interface{}) { - newMethods := make([]grpc.MethodDesc, len(desc.Methods)) - - for i, method := range desc.Methods { - method := method // Fix scopelint: Using the variable on range scope `method` in function literal - newMethods[i] = grpc.MethodDesc{ - MethodName: method.MethodName, - Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { - return method.Handler(srv, ctx, dec, proxy.interceptor) - }, - } - } - - newDesc := &grpc.ServiceDesc{ - ServiceName: desc.ServiceName, - HandlerType: desc.HandlerType, - Methods: newMethods, - Streams: desc.Streams, - Metadata: desc.Metadata, - } - - proxy.server.RegisterService(newDesc, handler) -} - -// ABCIQueryProxyInterceptor implements a query interceptor that intercepts -// queries and sends them to the ABCI. -func ABCIQueryProxyInterceptor(clientCtx client.Context) grpc.UnaryServerInterceptor { - resTypeCache := map[string]reflect.Type{} - - return func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) { - msg, ok := req.(proto.Message) - if !ok { - return nil, status.Errorf(codes.Internal, "unable to proto marshal") - } - - msgBz, err := proto.Marshal(msg) - if err != nil { - return nil, err - } - - abciReq := types.RequestQuery{ - Data: msgBz, - Path: info.FullMethod, - } - - resType, ok := resTypeCache[info.FullMethod] - if !ok { - // TODO figure out response type and cache - // NOTE: this is a bit hacky but... - } - - abciRes, err := clientCtx.QueryABCI(abciReq) - if err != nil { - return nil, err - } - - if abciRes.Code != 0 { - return nil, status.Errorf(codes.Internal, abciRes.Log) - } - - respMsg, ok := reflect.New(resType).Interface().(proto.Message) - if !ok { - return nil, status.Errorf(codes.Internal, "unable to proto marshal") - } - - err = proto.Unmarshal(abciRes.Value, respMsg) - if err != nil { - return nil, err - } - - return resp, nil - } -} diff --git a/server/grpc/proxy_test.go b/server/grpc/proxy_test.go deleted file mode 100644 index 56612c49b0c9..000000000000 --- a/server/grpc/proxy_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package grpc_test - -import ( - "context" - "testing" - - "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - - "github.com/cosmos/cosmos-sdk/testutil/network" - "github.com/cosmos/cosmos-sdk/testutil/testdata" -) - -type IntegrationTestSuite struct { - suite.Suite - - cfg network.Config - network *network.Network -} - -func (s *IntegrationTestSuite) SetupSuite() { - s.T().Log("setting up integration test suite") - - s.network = network.New(s.T(), network.DefaultConfig()) - s.Require().NotNil(s.network) - - _, err := s.network.WaitForHeight(1) - s.Require().NoError(err) -} - -func (s *IntegrationTestSuite) TestGRPC() { - val := s.network.Validators[0] - conn, err := grpc.Dial( - val.AppConfig.GRPC.Address, - grpc.WithInsecure(), // Or else we get "no transport security set" - ) - s.Require().NoError(err) - testClient := testdata.NewTestServiceClient(conn) - res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) - s.Require().NoError(err) - s.Require().Equal("hello", res.Message) -} - -func TestCLIIntegrationTestSuite(t *testing.T) { - suite.Run(t, &IntegrationTestSuite{}) -} diff --git a/server/start.go b/server/start.go index e9b73399cb1e..10e3e1fa0839 100644 --- a/server/start.go +++ b/server/start.go @@ -27,7 +27,6 @@ import ( "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" - grpcproxy "github.com/cosmos/cosmos-sdk/server/grpc" storetypes "github.com/cosmos/cosmos-sdk/store/types" ) @@ -244,26 +243,11 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator var grpcSrv *grpc.Server if config.GRPC.Enable { - genDoc, err := genDocProvider() - if err != nil { - return err - } - - clientCtx := client.Context{}. - WithHomeDir(home). - WithChainID(genDoc.ChainID). - WithJSONMarshaler(cdc). - WithClient(local.New(tmNode)). - WithTrustNode(true) - grpcSrv = grpc.NewServer() app.RegisterGRPC(grpcSrv) - // proxy queries to the ABCI query endpoint - proxyInterceptor := grpcproxy.ABCIQueryProxyInterceptor(clientCtx) - proxySrv := grpcproxy.NewProxyServer(grpcSrv, proxyInterceptor) - app.RegisterGRPCProxy(proxySrv) - + // Reflection allows external clients to see what services and methods + // the gRPC server exposes. reflection.Register(grpcSrv) listener, err := net.Listen("tcp", config.GRPC.Address) diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index e7b248e31a37..32a218f8570b 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -1,12 +1,15 @@ package network_test import ( + "context" "testing" "time" "github.com/stretchr/testify/suite" + "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/testutil/network" + "github.com/cosmos/cosmos-sdk/testutil/testdata" ) type IntegrationTestSuite struct { @@ -35,6 +38,19 @@ func (s *IntegrationTestSuite) TestNetwork_Liveness() { s.Require().NoError(err, "expected to reach 10 blocks; got %d", h) } +func (s *IntegrationTestSuite) TestGRPC() { + val := s.network.Validators[0] + conn, err := grpc.Dial( + val.AppConfig.GRPC.Address, + grpc.WithInsecure(), // Or else we get "no transport security set" + ) + s.Require().NoError(err) + testClient := testdata.NewTestServiceClient(conn) + res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) + s.Require().NoError(err) + s.Require().Equal("hello", res.Message) +} + func TestIntegrationTestSuite(t *testing.T) { suite.Run(t, new(IntegrationTestSuite)) } diff --git a/testutil/network/util.go b/testutil/network/util.go index 7d30b5600bd5..9692b3f7b553 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -15,11 +15,9 @@ import ( "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" - grpcproxy "github.com/cosmos/cosmos-sdk/server/grpc" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/cosmos/cosmos-sdk/x/genutil" @@ -93,13 +91,6 @@ func startInProcess(cfg Config, val *Validator) error { grpcSrv := grpc.NewServer() app.RegisterGRPC(grpcSrv) - // proxy queries to the ABCI query endpoint - proxyInterceptor := grpcproxy.ABCIQueryProxyInterceptor(val.ClientCtx) - proxySrv := grpcproxy.NewProxyServer(grpcSrv, proxyInterceptor) - app.RegisterGRPCProxy(proxySrv) - - reflection.Register(grpcSrv) - listener, err := net.Listen("tcp", val.AppConfig.GRPC.Address) if err != nil { From 9072a4262a96bfe95e8f1bba488e4742b5ba27ec Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 19:31:43 +0200 Subject: [PATCH 15/33] Tweaks --- baseapp/grpcrouter.go | 7 ------- server/constructors.go | 1 - server/start.go | 15 ++++++++------- testutil/network/util.go | 1 - 4 files changed, 8 insertions(+), 16 deletions(-) diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 7ff1b8413433..9ed6f7cd44de 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -97,13 +97,6 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf }) } -// RegisterProxyServer registers a gRPC proxy server. -func (qrt GRPCQueryRouter) RegisterProxyServer(server gogogrpc.Server) { - for _, data := range qrt.serviceData { - server.RegisterService(data.serviceDesc, data.handler) - } -} - // AnyUnpacker returns the AnyUnpacker for the router func (qrt *GRPCQueryRouter) AnyUnpacker() types.AnyUnpacker { return qrt.anyUnpacker diff --git a/server/constructors.go b/server/constructors.go index 61392392b87e..491369c2d158 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -7,7 +7,6 @@ import ( "path/filepath" "github.com/gogo/protobuf/grpc" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" tmtypes "github.com/tendermint/tendermint/types" diff --git a/server/start.go b/server/start.go index 10e3e1fa0839..9108b934043f 100644 --- a/server/start.go +++ b/server/start.go @@ -9,9 +9,6 @@ import ( "runtime/pprof" "time" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - "github.com/spf13/cobra" "github.com/tendermint/tendermint/abci/server" tcmd "github.com/tendermint/tendermint/cmd/tendermint/commands" @@ -21,6 +18,8 @@ import ( pvm "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/client/local" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" @@ -255,10 +254,12 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator return err } - err = grpcSrv.Serve(listener) - if err != nil { - return err - } + go func() { + err = grpcSrv.Serve(listener) + if err != nil { + fmt.Printf("failed to serve: %v\n", err) + } + }() } var cpuProfileCleanup func() diff --git a/testutil/network/util.go b/testutil/network/util.go index 9692b3f7b553..d27f5a1e5238 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -86,7 +86,6 @@ func startInProcess(cfg Config, val *Validator) error { val.api = apiSrv } - // TODO Factor code with server/start.go if val.AppConfig.GRPC.Enable { grpcSrv := grpc.NewServer() app.RegisterGRPC(grpcSrv) From f15c036d74bd4fdbb1d6e20b4e667514bdf6b3e3 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 20:09:55 +0200 Subject: [PATCH 16/33] Use channel to handle error --- server/start.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/server/start.go b/server/start.go index 1c6329fb6e8d..0a90ae4a000f 100644 --- a/server/start.go +++ b/server/start.go @@ -256,12 +256,19 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator return err } + errCh := make(chan error) go func() { err = grpcSrv.Serve(listener) if err != nil { - fmt.Printf("failed to serve: %v\n", err) + errCh <- fmt.Errorf("failed to serve: %v", err) } }() + + select { + case err := <-errCh: + return err + case <-time.After(5 * time.Second): // assume server started successfully + } } var cpuProfileCleanup func() From ff837e8c16f16fd41334779b03b9233aa0c6c4bc Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 23:11:42 +0200 Subject: [PATCH 17/33] Use error chan --- testutil/network/util.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/testutil/network/util.go b/testutil/network/util.go index 89da27832dd0..5461a1b25a0e 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -95,11 +95,19 @@ func startInProcess(cfg Config, val *Validator) error { return err } + errCh := make(chan error) go func() { - if err := grpcSrv.Serve(listener); err != nil { - fmt.Printf("failed to serve: %v\n", err) + err = grpcSrv.Serve(listener) + if err != nil { + errCh <- fmt.Errorf("failed to serve: %v", err) } }() + + select { + case err := <-errCh: + return err + case <-time.After(5 * time.Second): // assume server started successfully + } } return nil From 4080396c279908e5899e731a852306af911c1d5c Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 23:13:26 +0200 Subject: [PATCH 18/33] Update server/start.go Co-authored-by: Federico Kunze <31522760+fedekunze@users.noreply.github.com> --- server/start.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/start.go b/server/start.go index 0a90ae4a000f..b791533ae052 100644 --- a/server/start.go +++ b/server/start.go @@ -260,7 +260,7 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator go func() { err = grpcSrv.Serve(listener) if err != nil { - errCh <- fmt.Errorf("failed to serve: %v", err) + errCh <- fmt.Errorf("failed to serve: %w", err) } }() From 670c44ef58ae07905568e02569b462d991450e54 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Thu, 23 Jul 2020 23:13:35 +0200 Subject: [PATCH 19/33] Use %w --- testutil/network/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testutil/network/util.go b/testutil/network/util.go index 5461a1b25a0e..d7392498d1ab 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -99,7 +99,7 @@ func startInProcess(cfg Config, val *Validator) error { go func() { err = grpcSrv.Serve(listener) if err != nil { - errCh <- fmt.Errorf("failed to serve: %v", err) + errCh <- fmt.Errorf("failed to serve: %w", err) } }() From 323e1c2509c3d8a1dc56918cfd4ae9f4fda0fdeb Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 12:30:25 +0200 Subject: [PATCH 20/33] Add sdk.Context --- baseapp/baseapp.go | 60 +++++++++++++++++++++++++++++++++++++++- testutil/network/util.go | 1 - 2 files changed, 59 insertions(+), 2 deletions(-) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 3723fc47f644..d912d13a32ce 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -1,13 +1,18 @@ package baseapp import ( + "context" "fmt" "reflect" "strings" gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/libs/log" @@ -303,8 +308,61 @@ func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRou // RegisterGRPC registers gRPC services directly with the gRPC server. func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { + // Define an interceptor for all gRPC queries: this interceptor will create + // a new sdk.Context, and pass it into the query handler. + interceptor := func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + // To create the correct sdk.Context, we first need to transform the + // req into an abci.RequestQuery + msg, ok := req.(proto.Message) + if !ok { + return nil, status.Errorf(codes.Internal, "unable to proto marshal") + } + + msgBz, err := proto.Marshal(msg) + if err != nil { + return nil, err + } + + abciReq := types.RequestQuery{ + Data: msgBz, + Path: info.FullMethod, + } + + // Create the sdk.Context + sdkCtx, err := app.createQueryContext(abciReq) + if err != nil { + return nil, err + } + + ctx := sdk.WrapSDKContext(sdkCtx) + + // Run the handler, with our wrapped sdk.Context + return handler(ctx, req) + } + for _, data := range app.GRPCQueryRouter().serviceData { - server.RegisterService(data.serviceDesc, data.handler) + desc := data.serviceDesc + newMethods := make([]grpc.MethodDesc, len(desc.Methods)) + + for i, method := range desc.Methods { + methodHandler := method.Handler // Fix scopelint: Using the variable on range scope `method` in function literal + newMethods[i] = grpc.MethodDesc{ + MethodName: method.MethodName, + Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { + return methodHandler(srv, ctx, dec, interceptor) + }, + } + } + + newDesc := &grpc.ServiceDesc{ + ServiceName: desc.ServiceName, + HandlerType: desc.HandlerType, + Methods: newMethods, + Streams: desc.Streams, + Metadata: desc.Metadata, + } + + server.RegisterService(newDesc, data.handler) } } diff --git a/testutil/network/util.go b/testutil/network/util.go index d7392498d1ab..e02a71dd7474 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -90,7 +90,6 @@ func startInProcess(cfg Config, val *Validator) error { app.RegisterGRPC(grpcSrv) listener, err := net.Listen("tcp", val.AppConfig.GRPC.Address) - if err != nil { return err } From 9058b6dc431bf40247019096eb9866e852a46d4e Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 12:32:28 +0200 Subject: [PATCH 21/33] Add comments --- baseapp/baseapp.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index d912d13a32ce..7b7cedb9899e 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -340,6 +340,8 @@ func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { return handler(ctx, req) } + // Loop through all services and methods, add the interceptor, and register + // the service for _, data := range app.GRPCQueryRouter().serviceData { desc := data.serviceDesc newMethods := make([]grpc.MethodDesc, len(desc.Methods)) From 0411977264b436dedd8d3db18b3975e1f25ccfaf Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 12:37:39 +0200 Subject: [PATCH 22/33] Fix lint --- baseapp/baseapp.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 7b7cedb9899e..247287a018c2 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -12,7 +12,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/libs/log" @@ -323,7 +322,7 @@ func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { return nil, err } - abciReq := types.RequestQuery{ + abciReq := abci.RequestQuery{ Data: msgBz, Path: info.FullMethod, } From 0c902392d2204e6aebf2b0252e7827acd4eb5253 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 15:53:23 +0200 Subject: [PATCH 23/33] Add header and tests --- baseapp/baseapp.go | 25 ++++++++++++++++++----- testutil/network/network.go | 6 +++--- testutil/network/network_test.go | 35 ++++++++++++++++++++++++++++---- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 247287a018c2..0e3017041d79 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -4,12 +4,14 @@ import ( "context" "fmt" "reflect" + "strconv" "strings" gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" abci "github.com/tendermint/tendermint/abci/types" @@ -27,6 +29,9 @@ const ( runTxModeReCheck // Recheck a (pending) transaction after a commit runTxModeSimulate // Simulate a transaction runTxModeDeliver // Deliver a transaction + + // GRPCBlockHeightHeader is the gRPC header for block height + GRPCBlockHeightHeader = "sdk-block-height" ) var ( @@ -309,7 +314,7 @@ func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRou func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { // Define an interceptor for all gRPC queries: this interceptor will create // a new sdk.Context, and pass it into the query handler. - interceptor := func(_ context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // To create the correct sdk.Context, we first need to transform the // req into an abci.RequestQuery msg, ok := req.(proto.Message) @@ -333,10 +338,20 @@ func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { return nil, err } - ctx := sdk.WrapSDKContext(sdkCtx) + // Wrap the sdk.Context into a new context.Context + newCtx := sdk.WrapSDKContext(sdkCtx) + + // Attach the original stream from the gRPC context into the new + // context.Context + stream := grpc.ServerTransportStreamFromContext(grpcCtx) + newCtx = grpc.NewContextWithServerTransportStream(newCtx, stream) + + // Add gRPC headers + md := metadata.Pairs(GRPCBlockHeightHeader, strconv.FormatInt(sdkCtx.BlockHeight(), 10)) + grpc.SetHeader(newCtx, md) - // Run the handler, with our wrapped sdk.Context - return handler(ctx, req) + // Run the handler, with our new context.Context + return handler(newCtx, req) } // Loop through all services and methods, add the interceptor, and register @@ -346,7 +361,7 @@ func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { newMethods := make([]grpc.MethodDesc, len(desc.Methods)) for i, method := range desc.Methods { - methodHandler := method.Handler // Fix scopelint: Using the variable on range scope `method` in function literal + methodHandler := method.Handler newMethods[i] = grpc.MethodDesc{ MethodName: method.MethodName, Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { diff --git a/testutil/network/network.go b/testutil/network/network.go index 073af242c752..577df1b22b7d 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -118,7 +118,7 @@ type ( BaseDir string Validators []*Validator - config Config + Config Config } // Validator defines an in-process Tendermint validator node. Through this object, @@ -157,7 +157,7 @@ func New(t *testing.T, cfg Config) *Network { T: t, BaseDir: baseDir, Validators: make([]*Validator, cfg.NumValidators), - config: cfg, + Config: cfg, } t.Log("preparing test network...") @@ -429,7 +429,7 @@ func (n *Network) Cleanup() { } } - if n.config.CleanupDir { + if n.Config.CleanupDir { _ = os.RemoveAll(n.BaseDir) } diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index 32a218f8570b..1c8b04e9136b 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -2,14 +2,19 @@ package network_test import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/suite" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/testutil/network" "github.com/cosmos/cosmos-sdk/testutil/testdata" + sdk "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" ) type IntegrationTestSuite struct { @@ -39,16 +44,38 @@ func (s *IntegrationTestSuite) TestNetwork_Liveness() { } func (s *IntegrationTestSuite) TestGRPC() { - val := s.network.Validators[0] + _, err := s.network.WaitForHeight(1) + s.Require().NoError(err) + + val0 := s.network.Validators[0] conn, err := grpc.Dial( - val.AppConfig.GRPC.Address, + val0.AppConfig.GRPC.Address, grpc.WithInsecure(), // Or else we get "no transport security set" ) s.Require().NoError(err) + + // gRPC query with test service should work testClient := testdata.NewTestServiceClient(conn) - res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) + testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) s.Require().NoError(err) - s.Require().Equal("hello", res.Message) + s.Require().Equal("hello", testRes.Message) + + // gRPC query with bank service should work + denom := fmt.Sprintf("%stoken", val0.Moniker) + bankClient := banktypes.NewQueryClient(conn) + var header metadata.MD + bankRes, err := bankClient.Balance( + context.Background(), + &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, + grpc.Header(&header), // Also fetch grpc header + ) + s.Require().NoError(err) + s.Require().Equal( + sdk.NewCoin(denom, s.network.Config.AccountTokens), + *bankRes.GetBalance(), + ) + blockHeight := header.Get(baseapp.GRPCBlockHeightHeader) + s.Require().Equal([]string{"1"}, blockHeight) } func TestIntegrationTestSuite(t *testing.T) { From 8f15a0563a9fac5af87d6c3725e3b7ac41118873 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 17:19:01 +0200 Subject: [PATCH 24/33] Address comments --- baseapp/baseapp.go | 17 ++++++----------- server/constructors.go | 5 +++-- server/start.go | 2 +- testutil/network/network_test.go | 4 ++-- testutil/network/util.go | 2 +- types/context.go | 7 ++++--- 6 files changed, 17 insertions(+), 20 deletions(-) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 0e3017041d79..3dba421c6756 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -310,8 +310,8 @@ func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } // GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } -// RegisterGRPC registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { +// RegisterGRPCServer registers gRPC services directly with the gRPC server. +func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { // Define an interceptor for all gRPC queries: this interceptor will create // a new sdk.Context, and pass it into the query handler. interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { @@ -338,20 +338,15 @@ func (app *BaseApp) RegisterGRPC(server gogogrpc.Server) { return nil, err } - // Wrap the sdk.Context into a new context.Context - newCtx := sdk.WrapSDKContext(sdkCtx) - - // Attach the original stream from the gRPC context into the new - // context.Context - stream := grpc.ServerTransportStreamFromContext(grpcCtx) - newCtx = grpc.NewContextWithServerTransportStream(newCtx, stream) + // Attach the sdk.Context into the gRPC's context.Context + grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) // Add gRPC headers md := metadata.Pairs(GRPCBlockHeightHeader, strconv.FormatInt(sdkCtx.BlockHeight(), 10)) - grpc.SetHeader(newCtx, md) + grpc.SetHeader(grpcCtx, md) // Run the handler, with our new context.Context - return handler(newCtx, req) + return handler(grpcCtx, req) } // Loop through all services and methods, add the interceptor, and register diff --git a/server/constructors.go b/server/constructors.go index 491369c2d158..4c777a2b93d8 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -36,8 +36,9 @@ type ( RegisterAPIRoutes(*api.Server) - // RegisterGRPC registers gRPC services directly with the gRPC server. - RegisterGRPC(grpc.Server) + // RegisterGRPCServer registers gRPC services directly with the gRPC + // server. + RegisterGRPCServer(grpc.Server) } // AppCreator is a function that allows us to lazily initialize an diff --git a/server/start.go b/server/start.go index b791533ae052..755213873b5c 100644 --- a/server/start.go +++ b/server/start.go @@ -245,7 +245,7 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator var grpcSrv *grpc.Server if config.GRPC.Enable { grpcSrv = grpc.NewServer() - app.RegisterGRPC(grpcSrv) + app.RegisterGRPCServer(grpcSrv) // Reflection allows external clients to see what services and methods // the gRPC server exposes. diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index 1c8b04e9136b..038e205b4b60 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -54,13 +54,13 @@ func (s *IntegrationTestSuite) TestGRPC() { ) s.Require().NoError(err) - // gRPC query with test service should work + // gRPC query to test service should work testClient := testdata.NewTestServiceClient(conn) testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) s.Require().NoError(err) s.Require().Equal("hello", testRes.Message) - // gRPC query with bank service should work + // gRPC query to bank service should work denom := fmt.Sprintf("%stoken", val0.Moniker) bankClient := banktypes.NewQueryClient(conn) var header metadata.MD diff --git a/testutil/network/util.go b/testutil/network/util.go index e02a71dd7474..52d6b30f025d 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -87,7 +87,7 @@ func startInProcess(cfg Config, val *Validator) error { if val.AppConfig.GRPC.Enable { grpcSrv := grpc.NewServer() - app.RegisterGRPC(grpcSrv) + app.RegisterGRPCServer(grpcSrv) listener, err := net.Listen("tcp", val.AppConfig.GRPC.Address) if err != nil { diff --git a/types/context.go b/types/context.go index 0cc01091c1f2..251ce6828b69 100644 --- a/types/context.go +++ b/types/context.go @@ -245,19 +245,20 @@ func (c Context) CacheContext() (cc Context, writeCache func()) { // ContextKey defines a type alias for a stdlib Context key. type ContextKey string -const sdkContextKey ContextKey = "sdk-context" +// SdkContextKey is the key in the context.Context which holds the sdk.Context. +const SdkContextKey ContextKey = "sdk-context" // WrapSDKContext returns a stdlib context.Context with the provided sdk.Context's internal // context as a value. It is useful for passing an sdk.Context through methods that take a // stdlib context.Context parameter such as generated gRPC methods. To get the original // sdk.Context back, call UnwrapSDKContext. func WrapSDKContext(ctx Context) context.Context { - return context.WithValue(ctx.ctx, sdkContextKey, ctx) + return context.WithValue(ctx.ctx, SdkContextKey, ctx) } // UnwrapSDKContext retrieves a Context from a context.Context instance // attached with WrapSDKContext. It panics if a Context was not properly // attached func UnwrapSDKContext(ctx context.Context) Context { - return ctx.Value(sdkContextKey).(Context) + return ctx.Value(SdkContextKey).(Context) } From f533116241a1d30c4b37c7aba03c990449d9ec64 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 17:58:25 +0200 Subject: [PATCH 25/33] Factorize some code --- baseapp/baseapp.go | 7 ++--- server/constructors.go | 40 -------------------------- server/export.go | 3 +- server/grpc/server.go | 47 +++++++++++++++++++++++++++++++ server/pruning.go | 13 +++++---- server/start.go | 33 ++++------------------ server/types/app.go | 48 ++++++++++++++++++++++++++++++++ server/util.go | 3 +- testutil/network/network.go | 7 +++-- testutil/network/network_test.go | 4 +-- testutil/network/util.go | 23 ++------------- 11 files changed, 124 insertions(+), 104 deletions(-) create mode 100644 server/grpc/server.go create mode 100644 server/types/app.go diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 3dba421c6756..292bab0d13ab 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -19,6 +19,7 @@ import ( "github.com/tendermint/tendermint/libs/log" dbm "github.com/tendermint/tm-db" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/store" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -29,9 +30,6 @@ const ( runTxModeReCheck // Recheck a (pending) transaction after a commit runTxModeSimulate // Simulate a transaction runTxModeDeliver // Deliver a transaction - - // GRPCBlockHeightHeader is the gRPC header for block height - GRPCBlockHeightHeader = "sdk-block-height" ) var ( @@ -342,10 +340,9 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) // Add gRPC headers - md := metadata.Pairs(GRPCBlockHeightHeader, strconv.FormatInt(sdkCtx.BlockHeight(), 10)) + md := metadata.Pairs(servergrpc.GRPCBlockHeightHeader, strconv.FormatInt(sdkCtx.BlockHeight(), 10)) grpc.SetHeader(grpcCtx, md) - // Run the handler, with our new context.Context return handler(grpcCtx, req) } diff --git a/server/constructors.go b/server/constructors.go index 4c777a2b93d8..7f6f38b55ce3 100644 --- a/server/constructors.go +++ b/server/constructors.go @@ -1,55 +1,15 @@ package server import ( - "encoding/json" "io" "os" "path/filepath" - "github.com/gogo/protobuf/grpc" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" - tmtypes "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/server/api" sdk "github.com/cosmos/cosmos-sdk/types" ) -type ( - // AppOptions defines an interface that is passed into an application - // constructor, typically used to set BaseApp options that are either supplied - // via config file or through CLI arguments/flags. The underlying implementation - // is defined by the server package and is typically implemented via a Viper - // literal defined on the server Context. Note, casting Get calls may not yield - // the expected types and could result in type assertion errors. It is recommend - // to either use the cast package or perform manual conversion for safety. - AppOptions interface { - Get(string) interface{} - } - - // Application defines an application interface that wraps abci.Application. - // The interface defines the necessary contracts to be implemented in order - // to fully bootstrap and start an application. - Application interface { - abci.Application - - RegisterAPIRoutes(*api.Server) - - // RegisterGRPCServer registers gRPC services directly with the gRPC - // server. - RegisterGRPCServer(grpc.Server) - } - - // AppCreator is a function that allows us to lazily initialize an - // application using various configurations. - AppCreator func(log.Logger, dbm.DB, io.Writer, AppOptions) Application - - // AppExporter is a function that dumps all app state to - // JSON-serializable structure and returns the current validator set. - AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string) (json.RawMessage, []tmtypes.GenesisValidator, *abci.ConsensusParams, error) -) - func openDB(rootDir string) (dbm.DB, error) { dataDir := filepath.Join(rootDir, "data") db, err := sdk.NewLevelDB("application", dataDir) diff --git a/server/export.go b/server/export.go index 0fc94a06a5bd..5be74127d789 100644 --- a/server/export.go +++ b/server/export.go @@ -13,6 +13,7 @@ import ( "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/server/types" sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -23,7 +24,7 @@ const ( ) // ExportCmd dumps app state to JSON. -func ExportCmd(appExporter AppExporter, defaultNodeHome string) *cobra.Command { +func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Command { cmd := &cobra.Command{ Use: "export", Short: "Export state to JSON", diff --git a/server/grpc/server.go b/server/grpc/server.go new file mode 100644 index 000000000000..5ab4a3e23edb --- /dev/null +++ b/server/grpc/server.go @@ -0,0 +1,47 @@ +package grpc + +import ( + "fmt" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + + "github.com/cosmos/cosmos-sdk/server/types" +) + +const ( + // GRPCBlockHeightHeader is the gRPC header for block height + GRPCBlockHeightHeader = "sdk-block-height" +) + +// StartGRPCServer starts a gRPC server on the given address. +func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) { + grpcSrv := grpc.NewServer() + app.RegisterGRPCServer(grpcSrv) + + // Reflection allows external clients to see what services and methods + // the gRPC server exposes. + reflection.Register(grpcSrv) + + listener, err := net.Listen("tcp", address) + if err != nil { + return nil, err + } + + errCh := make(chan error) + go func() { + err = grpcSrv.Serve(listener) + if err != nil { + errCh <- fmt.Errorf("failed to serve: %w", err) + } + }() + + select { + case err := <-errCh: + return nil, err + case <-time.After(5 * time.Second): // assume server started successfully + return grpcSrv, nil + } +} diff --git a/server/pruning.go b/server/pruning.go index ca4f9d6e57ad..44aa4ba2d1e6 100644 --- a/server/pruning.go +++ b/server/pruning.go @@ -6,22 +6,23 @@ import ( "github.com/spf13/cast" + "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store" - "github.com/cosmos/cosmos-sdk/store/types" + storetypes "github.com/cosmos/cosmos-sdk/store/types" ) // GetPruningOptionsFromFlags parses command flags and returns the correct // PruningOptions. If a pruning strategy is provided, that will be parsed and // returned, otherwise, it is assumed custom pruning options are provided. -func GetPruningOptionsFromFlags(appOpts AppOptions) (types.PruningOptions, error) { +func GetPruningOptionsFromFlags(appOpts types.AppOptions) (storetypes.PruningOptions, error) { strategy := strings.ToLower(cast.ToString(appOpts.Get(FlagPruning))) switch strategy { - case types.PruningOptionDefault, types.PruningOptionNothing, types.PruningOptionEverything: - return types.NewPruningOptionsFromString(strategy), nil + case storetypes.PruningOptionDefault, storetypes.PruningOptionNothing, storetypes.PruningOptionEverything: + return storetypes.NewPruningOptionsFromString(strategy), nil - case types.PruningOptionCustom: - opts := types.NewPruningOptions( + case storetypes.PruningOptionCustom: + opts := storetypes.NewPruningOptions( cast.ToUint64(appOpts.Get(FlagPruningKeepRecent)), cast.ToUint64(appOpts.Get(FlagPruningKeepEvery)), cast.ToUint64(appOpts.Get(FlagPruningInterval)), diff --git a/server/start.go b/server/start.go index 755213873b5c..810080f6f7f2 100644 --- a/server/start.go +++ b/server/start.go @@ -4,7 +4,6 @@ package server import ( "fmt" - "net" "os" "runtime/pprof" "time" @@ -19,13 +18,14 @@ import ( "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/client/local" "google.golang.org/grpc" - "google.golang.org/grpc/reflection" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" + "github.com/cosmos/cosmos-sdk/server/types" storetypes "github.com/cosmos/cosmos-sdk/store/types" ) @@ -52,7 +52,7 @@ const ( // StartCmd runs the service passed in, either stand-alone or in-process with // Tendermint. -func StartCmd(appCreator AppCreator, defaultNodeHome string) *cobra.Command { +func StartCmd(appCreator types.AppCreator, defaultNodeHome string) *cobra.Command { cmd := &cobra.Command{ Use: "start", Short: "Run the full node", @@ -128,7 +128,7 @@ which accepts a path for the resulting pprof file. return cmd } -func startStandAlone(ctx *Context, appCreator AppCreator) error { +func startStandAlone(ctx *Context, appCreator types.AppCreator) error { addr := ctx.Viper.GetString(flagAddress) transport := ctx.Viper.GetString(flagTransport) home := ctx.Viper.GetString(flags.FlagHome) @@ -168,7 +168,7 @@ func startStandAlone(ctx *Context, appCreator AppCreator) error { select {} } -func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator) error { +func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator types.AppCreator) error { cfg := ctx.Config home := cfg.RootDir @@ -244,31 +244,10 @@ func startInProcess(ctx *Context, cdc codec.JSONMarshaler, appCreator AppCreator var grpcSrv *grpc.Server if config.GRPC.Enable { - grpcSrv = grpc.NewServer() - app.RegisterGRPCServer(grpcSrv) - - // Reflection allows external clients to see what services and methods - // the gRPC server exposes. - reflection.Register(grpcSrv) - - listener, err := net.Listen("tcp", config.GRPC.Address) + grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address) if err != nil { return err } - - errCh := make(chan error) - go func() { - err = grpcSrv.Serve(listener) - if err != nil { - errCh <- fmt.Errorf("failed to serve: %w", err) - } - }() - - select { - case err := <-errCh: - return err - case <-time.After(5 * time.Second): // assume server started successfully - } } var cpuProfileCleanup func() diff --git a/server/types/app.go b/server/types/app.go new file mode 100644 index 000000000000..f68a93207630 --- /dev/null +++ b/server/types/app.go @@ -0,0 +1,48 @@ +package types + +import ( + "encoding/json" + "io" + + "github.com/gogo/protobuf/grpc" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" + tmtypes "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tm-db" + + "github.com/cosmos/cosmos-sdk/server/api" +) + +type ( + // AppOptions defines an interface that is passed into an application + // constructor, typically used to set BaseApp options that are either supplied + // via config file or through CLI arguments/flags. The underlying implementation + // is defined by the server package and is typically implemented via a Viper + // literal defined on the server Context. Note, casting Get calls may not yield + // the expected types and could result in type assertion errors. It is recommend + // to either use the cast package or perform manual conversion for safety. + AppOptions interface { + Get(string) interface{} + } + + // Application defines an application interface that wraps abci.Application. + // The interface defines the necessary contracts to be implemented in order + // to fully bootstrap and start an application. + Application interface { + abci.Application + + RegisterAPIRoutes(*api.Server) + + // RegisterGRPCServer registers gRPC services directly with the gRPC + // server. + RegisterGRPCServer(grpc.Server) + } + + // AppCreator is a function that allows us to lazily initialize an + // application using various configurations. + AppCreator func(log.Logger, dbm.DB, io.Writer, AppOptions) Application + + // AppExporter is a function that dumps all app state to + // JSON-serializable structure and returns the current validator set. + AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string) (json.RawMessage, []tmtypes.GenesisValidator, *abci.ConsensusParams, error) +) diff --git a/server/util.go b/server/util.go index 7b329e49c09c..842bfbc837d8 100644 --- a/server/util.go +++ b/server/util.go @@ -23,6 +23,7 @@ import ( "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/config" + "github.com/cosmos/cosmos-sdk/server/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/version" ) @@ -167,7 +168,7 @@ func interceptConfigs(ctx *Context, rootViper *viper.Viper) (*tmcfg.Config, erro } // add server commands -func AddCommands(rootCmd *cobra.Command, appCreator AppCreator, appExport AppExporter) { +func AddCommands(rootCmd *cobra.Command, appCreator types.AppCreator, appExport types.AppExporter) { tendermintCmd := &cobra.Command{ Use: "tendermint", Short: "Tendermint subcommands", diff --git a/testutil/network/network.go b/testutil/network/network.go index 577df1b22b7d..5ac0097d4138 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -22,6 +22,7 @@ import ( "github.com/tendermint/tendermint/node" tmclient "github.com/tendermint/tendermint/rpc/client" dbm "github.com/tendermint/tm-db" + "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" @@ -31,6 +32,7 @@ import ( "github.com/cosmos/cosmos-sdk/server" "github.com/cosmos/cosmos-sdk/server/api" srvconfig "github.com/cosmos/cosmos-sdk/server/config" + servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/simapp" storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -45,9 +47,9 @@ var lock = new(sync.Mutex) // AppConstructor defines a function which accepts a network configuration and // creates an ABCI Application to provide to Tendermint. -type AppConstructor = func(val Validator) server.Application +type AppConstructor = func(val Validator) servertypes.Application -func NewSimApp(val Validator) server.Application { +func NewSimApp(val Validator) servertypes.Application { return simapp.NewSimApp( val.Ctx.Logger, dbm.NewMemDB(), nil, true, make(map[int64]bool), val.Ctx.Config.RootDir, 0, baseapp.SetPruning(storetypes.NewPruningOptionsFromString(val.AppConfig.Pruning)), @@ -141,6 +143,7 @@ type ( tmNode *node.Node api *api.Server + grpc *grpc.Server } ) diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index 038e205b4b60..a130429e0a89 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -10,7 +10,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" - "github.com/cosmos/cosmos-sdk/baseapp" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/testutil/network" "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" @@ -74,7 +74,7 @@ func (s *IntegrationTestSuite) TestGRPC() { sdk.NewCoin(denom, s.network.Config.AccountTokens), *bankRes.GetBalance(), ) - blockHeight := header.Get(baseapp.GRPCBlockHeightHeader) + blockHeight := header.Get(servergrpc.GRPCBlockHeightHeader) s.Require().Equal([]string{"1"}, blockHeight) } diff --git a/testutil/network/util.go b/testutil/network/util.go index 52d6b30f025d..be450b8872f0 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,8 +1,6 @@ package network import ( - "fmt" - "net" "path/filepath" "time" @@ -14,10 +12,10 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" - "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/codec" "github.com/cosmos/cosmos-sdk/server/api" + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" "github.com/cosmos/cosmos-sdk/x/genutil" @@ -86,27 +84,12 @@ func startInProcess(cfg Config, val *Validator) error { } if val.AppConfig.GRPC.Enable { - grpcSrv := grpc.NewServer() - app.RegisterGRPCServer(grpcSrv) - - listener, err := net.Listen("tcp", val.AppConfig.GRPC.Address) + grpcSrv, err := servergrpc.StartGRPCServer(app, val.AppConfig.GRPC.Address) if err != nil { return err } - errCh := make(chan error) - go func() { - err = grpcSrv.Serve(listener) - if err != nil { - errCh <- fmt.Errorf("failed to serve: %w", err) - } - }() - - select { - case err := <-errCh: - return err - case <-time.After(5 * time.Second): // assume server started successfully - } + val.grpc = grpcSrv } return nil From 72db9210252712bb0d8ad01854a03ff0b9892bc6 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Fri, 24 Jul 2020 18:03:07 +0200 Subject: [PATCH 26/33] Fix lint --- simapp/simd/cmd/root.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/simapp/simd/cmd/root.go b/simapp/simd/cmd/root.go index c3260b1ff06d..91df95602f54 100644 --- a/simapp/simd/cmd/root.go +++ b/simapp/simd/cmd/root.go @@ -21,6 +21,7 @@ import ( "github.com/cosmos/cosmos-sdk/client/keys" "github.com/cosmos/cosmos-sdk/client/rpc" "github.com/cosmos/cosmos-sdk/server" + servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/simapp" "github.com/cosmos/cosmos-sdk/store" sdk "github.com/cosmos/cosmos-sdk/types" @@ -148,7 +149,7 @@ func txCommand() *cobra.Command { return cmd } -func newApp(logger log.Logger, db dbm.DB, traceStore io.Writer, appOpts server.AppOptions) server.Application { +func newApp(logger log.Logger, db dbm.DB, traceStore io.Writer, appOpts servertypes.AppOptions) servertypes.Application { var cache sdk.MultiStorePersistentCache if cast.ToBool(appOpts.Get(server.FlagInterBlockCache)) { From d93803b7b76e4ba8ebf0435927eb6a1c0b8eac31 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 27 Jul 2020 11:19:27 +0200 Subject: [PATCH 27/33] Add height and prove in req metadata --- baseapp/abci.go | 18 +++--- baseapp/baseapp.go | 77 ------------------------- baseapp/grpcserver.go | 96 ++++++++++++++++++++++++++++++++ server/config/config.go | 4 +- server/grpc/server.go | 5 +- testutil/network/network_test.go | 11 +++- 6 files changed, 122 insertions(+), 89 deletions(-) create mode 100644 baseapp/grpcserver.go diff --git a/baseapp/abci.go b/baseapp/abci.go index e5784479e7be..be37b9a4bc4b 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -349,7 +349,7 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery { } func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery { - ctx, err := app.createQueryContext(req) + ctx, err := app.createQueryContext(req.Height, req.Prove) if err != nil { return sdkerrors.QueryResult(err) } @@ -364,13 +364,15 @@ func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQu return res } -func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, error) { +// createQueryContext creates a new sdk.Context for a query, taking as args +// the block height and whether the query needs a proof or not. +func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, error) { // when a client did not provide a query height, manually inject the latest - if req.Height == 0 { - req.Height = app.LastBlockHeight() + if height == 0 { + height = app.LastBlockHeight() } - if req.Height <= 1 && req.Prove { + if height <= 1 && prove { return sdk.Context{}, sdkerrors.Wrap( sdkerrors.ErrInvalidRequest, @@ -378,12 +380,12 @@ func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, erro ) } - cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height) + cacheMS, err := app.cms.CacheMultiStoreWithVersion(height) if err != nil { return sdk.Context{}, sdkerrors.Wrapf( sdkerrors.ErrInvalidRequest, - "failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(), + "failed to load state at height %d; %s (latest height: %d)", height, err, app.LastBlockHeight(), ) } @@ -517,7 +519,7 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) abci. return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1])) } - ctx, err := app.createQueryContext(req) + ctx, err := app.createQueryContext(req.Height, req.Prove) if err != nil { return sdkerrors.QueryResult(err) } diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 292bab0d13ab..0cedd43f6c17 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -1,25 +1,17 @@ package baseapp import ( - "context" "fmt" "reflect" - "strconv" "strings" - gogogrpc "github.com/gogo/protobuf/grpc" "github.com/gogo/protobuf/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/libs/log" dbm "github.com/tendermint/tm-db" - servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/store" sdk "github.com/cosmos/cosmos-sdk/types" sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" @@ -305,75 +297,6 @@ func (app *BaseApp) Router() sdk.Router { // QueryRouter returns the QueryRouter of a BaseApp. func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter } -// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. -func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } - -// RegisterGRPCServer registers gRPC services directly with the gRPC server. -func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { - // Define an interceptor for all gRPC queries: this interceptor will create - // a new sdk.Context, and pass it into the query handler. - interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - // To create the correct sdk.Context, we first need to transform the - // req into an abci.RequestQuery - msg, ok := req.(proto.Message) - if !ok { - return nil, status.Errorf(codes.Internal, "unable to proto marshal") - } - - msgBz, err := proto.Marshal(msg) - if err != nil { - return nil, err - } - - abciReq := abci.RequestQuery{ - Data: msgBz, - Path: info.FullMethod, - } - - // Create the sdk.Context - sdkCtx, err := app.createQueryContext(abciReq) - if err != nil { - return nil, err - } - - // Attach the sdk.Context into the gRPC's context.Context - grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) - - // Add gRPC headers - md := metadata.Pairs(servergrpc.GRPCBlockHeightHeader, strconv.FormatInt(sdkCtx.BlockHeight(), 10)) - grpc.SetHeader(grpcCtx, md) - - return handler(grpcCtx, req) - } - - // Loop through all services and methods, add the interceptor, and register - // the service - for _, data := range app.GRPCQueryRouter().serviceData { - desc := data.serviceDesc - newMethods := make([]grpc.MethodDesc, len(desc.Methods)) - - for i, method := range desc.Methods { - methodHandler := method.Handler - newMethods[i] = grpc.MethodDesc{ - MethodName: method.MethodName, - Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { - return methodHandler(srv, ctx, dec, interceptor) - }, - } - } - - newDesc := &grpc.ServiceDesc{ - ServiceName: desc.ServiceName, - HandlerType: desc.HandlerType, - Methods: newMethods, - Streams: desc.Streams, - Metadata: desc.Metadata, - } - - server.RegisterService(newDesc, data.handler) - } -} - // Seal seals a BaseApp. It prohibits any further modifications to a BaseApp. func (app *BaseApp) Seal() { app.sealed = true } diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go new file mode 100644 index 000000000000..feb18c74ed84 --- /dev/null +++ b/baseapp/grpcserver.go @@ -0,0 +1,96 @@ +package baseapp + +import ( + "context" + "strconv" + + gogogrpc "github.com/gogo/protobuf/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp. +func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter } + +// RegisterGRPCServer registers gRPC services directly with the gRPC server. +func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { + // Define an interceptor for all gRPC queries: this interceptor will create + // a new sdk.Context, and pass it into the query handler. + interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + // If there's some metadata in the context, retrieve it + md, ok := metadata.FromIncomingContext(grpcCtx) + if !ok { + return nil, status.Error(codes.Internal, "unable to retrieve metadata") + } + + // Get height and prove headers from the request context + var ( + height int64 + prove bool + ) + + if heightHeaders := md.Get(servergrpc.GRPCBlockHeightHeader); len(heightHeaders) > 0 { + height, err = strconv.ParseInt(heightHeaders[0], 10, 64) + if err != nil { + return nil, err + } + } + + if proveHeaders := md.Get(servergrpc.GRPCWithProofHeader); len(proveHeaders) > 0 { + prove, err = strconv.ParseBool(proveHeaders[0]) + if err != nil { + return nil, err + } + } + + // Create the sdk.Context + sdkCtx, err := app.createQueryContext(height, prove) + if err != nil { + return nil, err + } + + // Attach the sdk.Context into the gRPC's context.Context + grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) + + // Add relevant gRPC headers + if height == 0 { + height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest + } + md = metadata.Pairs(servergrpc.GRPCBlockHeightHeader, strconv.FormatInt(height, 10)) + grpc.SetHeader(grpcCtx, md) + + return handler(grpcCtx, req) + } + + // Loop through all services and methods, add the interceptor, and register + // the service + for _, data := range app.GRPCQueryRouter().serviceData { + desc := data.serviceDesc + newMethods := make([]grpc.MethodDesc, len(desc.Methods)) + + for i, method := range desc.Methods { + methodHandler := method.Handler + newMethods[i] = grpc.MethodDesc{ + MethodName: method.MethodName, + Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) { + return methodHandler(srv, ctx, dec, interceptor) + }, + } + } + + newDesc := &grpc.ServiceDesc{ + ServiceName: desc.ServiceName, + HandlerType: desc.HandlerType, + Methods: newMethods, + Streams: desc.Streams, + Metadata: desc.Metadata, + } + + server.RegisterService(newDesc, data.handler) + } +} diff --git a/server/config/config.go b/server/config/config.go index fb67b989700e..801aaa06d0ab 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -192,8 +192,8 @@ func GetConfig(v *viper.Viper) Config { EnableUnsafeCORS: v.GetBool("api.enabled-unsafe-cors"), }, GRPC: GRPCConfig{ - Enable: viper.GetBool("grpc.enable"), - Address: viper.GetString("grpc.address"), + Enable: v.GetBool("grpc.enable"), + Address: v.GetString("grpc.address"), }, } } diff --git a/server/grpc/server.go b/server/grpc/server.go index 5ab4a3e23edb..beeca9e81368 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -12,8 +12,11 @@ import ( ) const ( - // GRPCBlockHeightHeader is the gRPC header for block height + // GRPCBlockHeightHeader is the gRPC header for block height. GRPCBlockHeightHeader = "sdk-block-height" + // GRPCWithProofHeader is a gRPC header which is true when the request + // requires a proof. + GRPCWithProofHeader = "sdk-require-proof" ) // StartGRPCServer starts a gRPC server on the given address. diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index a130429e0a89..72e2f22ffa7e 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -44,7 +44,7 @@ func (s *IntegrationTestSuite) TestNetwork_Liveness() { } func (s *IntegrationTestSuite) TestGRPC() { - _, err := s.network.WaitForHeight(1) + _, err := s.network.WaitForHeight(2) s.Require().NoError(err) val0 := s.network.Validators[0] @@ -75,6 +75,15 @@ func (s *IntegrationTestSuite) TestGRPC() { *bankRes.GetBalance(), ) blockHeight := header.Get(servergrpc.GRPCBlockHeightHeader) + s.Require().Equal([]string{"2"}, blockHeight) + + // Request metadata should work + bankRes, err = bankClient.Balance( + metadata.AppendToOutgoingContext(context.Background(), servergrpc.GRPCBlockHeightHeader, "1"), // Add metadata to request + &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, + grpc.Header(&header), + ) + blockHeight = header.Get(servergrpc.GRPCBlockHeightHeader) s.Require().Equal([]string{"1"}, blockHeight) } From a37c83cee82483c251490df10d868d466283f56b Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 27 Jul 2020 12:00:24 +0200 Subject: [PATCH 28/33] Add reflection test --- testutil/network/network_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index 72e2f22ffa7e..0dbb46b620d9 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -15,6 +15,7 @@ import ( "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" ) type IntegrationTestSuite struct { @@ -85,6 +86,23 @@ func (s *IntegrationTestSuite) TestGRPC() { ) blockHeight = header.Get(servergrpc.GRPCBlockHeightHeader) s.Require().Equal([]string{"1"}, blockHeight) + + // Test server reflection + reflectClient := rpb.NewServerReflectionClient(conn) + stream, err := reflectClient.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true)) + s.Require().NoError(err) + s.Require().NoError(stream.Send(&rpb.ServerReflectionRequest{ + MessageRequest: &rpb.ServerReflectionRequest_ListServices{}, + })) + res, err := stream.Recv() + s.Require().NoError(err) + services := res.GetListServicesResponse().Service + servicesMap := make(map[string]bool) + for _, s := range services { + servicesMap[s.Name] = true + } + // Make sure the following services are present + s.Require().True(servicesMap["cosmos.bank.Query"]) } func TestIntegrationTestSuite(t *testing.T) { From 8513b9dec6159be033657f44435518bd6cb53021 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 27 Jul 2020 12:04:22 +0200 Subject: [PATCH 29/33] Fix lint --- baseapp/grpcserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index feb18c74ed84..7d4e74a65fbf 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -21,7 +21,7 @@ func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRou func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { // Define an interceptor for all gRPC queries: this interceptor will create // a new sdk.Context, and pass it into the query handler. - interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // If there's some metadata in the context, retrieve it md, ok := metadata.FromIncomingContext(grpcCtx) if !ok { From 4f659ce3bc8426aac4b643fbbc7e72e8c75d26ea Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 27 Jul 2020 12:08:47 +0200 Subject: [PATCH 30/33] Put grpc test in server/grpc --- server/grpc/server_test.go | 101 +++++++++++++++++++++++++++++++ testutil/network/network_test.go | 70 --------------------- 2 files changed, 101 insertions(+), 70 deletions(-) create mode 100644 server/grpc/server_test.go diff --git a/server/grpc/server_test.go b/server/grpc/server_test.go new file mode 100644 index 000000000000..bd8c6979efc3 --- /dev/null +++ b/server/grpc/server_test.go @@ -0,0 +1,101 @@ +package grpc_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" + "github.com/cosmos/cosmos-sdk/testutil/network" + "github.com/cosmos/cosmos-sdk/testutil/testdata" + sdk "github.com/cosmos/cosmos-sdk/types" + banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" + rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" +) + +type IntegrationTestSuite struct { + suite.Suite + + network *network.Network +} + +func (s *IntegrationTestSuite) SetupSuite() { + s.T().Log("setting up integration test suite") + + s.network = network.New(s.T(), network.DefaultConfig()) + s.Require().NotNil(s.network) + + _, err := s.network.WaitForHeight(2) + s.Require().NoError(err) +} + +func (s *IntegrationTestSuite) TearDownSuite() { + s.T().Log("tearing down integration test suite") + s.network.Cleanup() +} + +func (s *IntegrationTestSuite) TestGRPC() { + val0 := s.network.Validators[0] + conn, err := grpc.Dial( + val0.AppConfig.GRPC.Address, + grpc.WithInsecure(), // Or else we get "no transport security set" + ) + s.Require().NoError(err) + + // gRPC query to test service should work + testClient := testdata.NewTestServiceClient(conn) + testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) + s.Require().NoError(err) + s.Require().Equal("hello", testRes.Message) + + // gRPC query to bank service should work + denom := fmt.Sprintf("%stoken", val0.Moniker) + bankClient := banktypes.NewQueryClient(conn) + var header metadata.MD + bankRes, err := bankClient.Balance( + context.Background(), + &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, + grpc.Header(&header), // Also fetch grpc header + ) + s.Require().NoError(err) + s.Require().Equal( + sdk.NewCoin(denom, s.network.Config.AccountTokens), + *bankRes.GetBalance(), + ) + blockHeight := header.Get(servergrpc.GRPCBlockHeightHeader) + s.Require().Equal([]string{"2"}, blockHeight) + + // Request metadata should work + bankRes, err = bankClient.Balance( + metadata.AppendToOutgoingContext(context.Background(), servergrpc.GRPCBlockHeightHeader, "1"), // Add metadata to request + &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, + grpc.Header(&header), + ) + blockHeight = header.Get(servergrpc.GRPCBlockHeightHeader) + s.Require().Equal([]string{"1"}, blockHeight) + + // Test server reflection + reflectClient := rpb.NewServerReflectionClient(conn) + stream, err := reflectClient.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true)) + s.Require().NoError(err) + s.Require().NoError(stream.Send(&rpb.ServerReflectionRequest{ + MessageRequest: &rpb.ServerReflectionRequest_ListServices{}, + })) + res, err := stream.Recv() + s.Require().NoError(err) + services := res.GetListServicesResponse().Service + servicesMap := make(map[string]bool) + for _, s := range services { + servicesMap[s.Name] = true + } + // Make sure the following services are present + s.Require().True(servicesMap["cosmos.bank.Query"]) +} + +func TestIntegrationTestSuite(t *testing.T) { + suite.Run(t, new(IntegrationTestSuite)) +} diff --git a/testutil/network/network_test.go b/testutil/network/network_test.go index 0dbb46b620d9..e7b248e31a37 100644 --- a/testutil/network/network_test.go +++ b/testutil/network/network_test.go @@ -1,21 +1,12 @@ package network_test import ( - "context" - "fmt" "testing" "time" "github.com/stretchr/testify/suite" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" - servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" "github.com/cosmos/cosmos-sdk/testutil/network" - "github.com/cosmos/cosmos-sdk/testutil/testdata" - sdk "github.com/cosmos/cosmos-sdk/types" - banktypes "github.com/cosmos/cosmos-sdk/x/bank/types" - rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" ) type IntegrationTestSuite struct { @@ -44,67 +35,6 @@ func (s *IntegrationTestSuite) TestNetwork_Liveness() { s.Require().NoError(err, "expected to reach 10 blocks; got %d", h) } -func (s *IntegrationTestSuite) TestGRPC() { - _, err := s.network.WaitForHeight(2) - s.Require().NoError(err) - - val0 := s.network.Validators[0] - conn, err := grpc.Dial( - val0.AppConfig.GRPC.Address, - grpc.WithInsecure(), // Or else we get "no transport security set" - ) - s.Require().NoError(err) - - // gRPC query to test service should work - testClient := testdata.NewTestServiceClient(conn) - testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) - s.Require().NoError(err) - s.Require().Equal("hello", testRes.Message) - - // gRPC query to bank service should work - denom := fmt.Sprintf("%stoken", val0.Moniker) - bankClient := banktypes.NewQueryClient(conn) - var header metadata.MD - bankRes, err := bankClient.Balance( - context.Background(), - &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, - grpc.Header(&header), // Also fetch grpc header - ) - s.Require().NoError(err) - s.Require().Equal( - sdk.NewCoin(denom, s.network.Config.AccountTokens), - *bankRes.GetBalance(), - ) - blockHeight := header.Get(servergrpc.GRPCBlockHeightHeader) - s.Require().Equal([]string{"2"}, blockHeight) - - // Request metadata should work - bankRes, err = bankClient.Balance( - metadata.AppendToOutgoingContext(context.Background(), servergrpc.GRPCBlockHeightHeader, "1"), // Add metadata to request - &banktypes.QueryBalanceRequest{Address: val0.Address, Denom: denom}, - grpc.Header(&header), - ) - blockHeight = header.Get(servergrpc.GRPCBlockHeightHeader) - s.Require().Equal([]string{"1"}, blockHeight) - - // Test server reflection - reflectClient := rpb.NewServerReflectionClient(conn) - stream, err := reflectClient.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true)) - s.Require().NoError(err) - s.Require().NoError(stream.Send(&rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_ListServices{}, - })) - res, err := stream.Recv() - s.Require().NoError(err) - services := res.GetListServicesResponse().Service - servicesMap := make(map[string]bool) - for _, s := range services { - servicesMap[s.Name] = true - } - // Make sure the following services are present - s.Require().True(servicesMap["cosmos.bank.Query"]) -} - func TestIntegrationTestSuite(t *testing.T) { suite.Run(t, new(IntegrationTestSuite)) } From ae797c0201c059d792ba8a99ebe2e3c34cbf53a6 Mon Sep 17 00:00:00 2001 From: Alexander Bezobchuk Date: Mon, 27 Jul 2020 09:48:55 -0400 Subject: [PATCH 31/33] Update baseapp/grpcserver.go --- baseapp/grpcserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index 7d4e74a65fbf..cd03d4d5d567 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -68,7 +68,7 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { } // Loop through all services and methods, add the interceptor, and register - // the service + // the service. for _, data := range app.GRPCQueryRouter().serviceData { desc := data.serviceDesc newMethods := make([]grpc.MethodDesc, len(desc.Methods)) From 180da4a375001888ac844d643108013f843b8139 Mon Sep 17 00:00:00 2001 From: Alexander Bezobchuk Date: Mon, 27 Jul 2020 09:49:02 -0400 Subject: [PATCH 32/33] Update baseapp/grpcserver.go --- baseapp/grpcserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index cd03d4d5d567..f449101d2998 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -22,7 +22,7 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { // Define an interceptor for all gRPC queries: this interceptor will create // a new sdk.Context, and pass it into the query handler. interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { - // If there's some metadata in the context, retrieve it + // If there's some metadata in the context, retrieve it. md, ok := metadata.FromIncomingContext(grpcCtx) if !ok { return nil, status.Error(codes.Internal, "unable to retrieve metadata") From b16cbd4300f2d4cb55ede8f7e93994c193e2f1f5 Mon Sep 17 00:00:00 2001 From: Amaury Martiny Date: Mon, 27 Jul 2020 15:55:40 +0200 Subject: [PATCH 33/33] Remove proof header --- baseapp/grpcserver.go | 22 ++++++---------------- server/grpc/server.go | 5 +---- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/baseapp/grpcserver.go b/baseapp/grpcserver.go index f449101d2998..0ff2b5407df8 100644 --- a/baseapp/grpcserver.go +++ b/baseapp/grpcserver.go @@ -28,12 +28,8 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { return nil, status.Error(codes.Internal, "unable to retrieve metadata") } - // Get height and prove headers from the request context - var ( - height int64 - prove bool - ) - + // Get height header from the request context, if present. + var height int64 if heightHeaders := md.Get(servergrpc.GRPCBlockHeightHeader); len(heightHeaders) > 0 { height, err = strconv.ParseInt(heightHeaders[0], 10, 64) if err != nil { @@ -41,20 +37,14 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) { } } - if proveHeaders := md.Get(servergrpc.GRPCWithProofHeader); len(proveHeaders) > 0 { - prove, err = strconv.ParseBool(proveHeaders[0]) - if err != nil { - return nil, err - } - } - - // Create the sdk.Context - sdkCtx, err := app.createQueryContext(height, prove) + // Create the sdk.Context. Passing false as 2nd arg, as we can't + // actually support proofs with gRPC right now. + sdkCtx, err := app.createQueryContext(height, false) if err != nil { return nil, err } - // Attach the sdk.Context into the gRPC's context.Context + // Attach the sdk.Context into the gRPC's context.Context. grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx) // Add relevant gRPC headers diff --git a/server/grpc/server.go b/server/grpc/server.go index beeca9e81368..c081133dac22 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -13,10 +13,7 @@ import ( const ( // GRPCBlockHeightHeader is the gRPC header for block height. - GRPCBlockHeightHeader = "sdk-block-height" - // GRPCWithProofHeader is a gRPC header which is true when the request - // requires a proof. - GRPCWithProofHeader = "sdk-require-proof" + GRPCBlockHeightHeader = "x-cosmos-block-height" ) // StartGRPCServer starts a gRPC server on the given address.