Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC server & reflection #6463

Merged
merged 48 commits into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8dd91e3
Add gRPC proxy
aaronc Jun 17, 2020
1e4d01f
Merge branch 'master' into aaronc/5921-grpc-proxy
aaronc Jun 17, 2020
ba6ded6
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jun 18, 2020
20fd64c
Make GRPC disabled by default
aaronc Jun 18, 2020
2ab5f5d
WIP on integration tests
aaronc Jun 18, 2020
70e5042
WIP on integration tests
aaronc Jun 18, 2020
4771a12
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jul 14, 2020
021591f
Start setting up in process tests
aaronc Jul 16, 2020
3176fb4
Merge branch 'master' of https://github.com/cosmos/cosmos-sdk into aa…
aaronc Jul 16, 2020
f2671c7
Start setting up in process tests
aaronc Jul 16, 2020
7254946
Make it compile
amaury1093 Jul 17, 2020
5834ca6
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
64554c4
Add start server to network util
amaury1093 Jul 20, 2020
928edef
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
b7dc4a8
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
973ff20
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 20, 2020
d0c5291
Add Println
amaury1093 Jul 20, 2020
978a26f
Use go routine
amaury1093 Jul 21, 2020
c93b5e3
Fix scopelint
amaury1093 Jul 21, 2020
71c8139
Move to proxy_test
amaury1093 Jul 21, 2020
42afb10
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 21, 2020
aaa1ac7
Add response type cache
aaronc Jul 23, 2020
b4421f1
Remove proxy
amaury1093 Jul 23, 2020
9072a42
Tweaks
amaury1093 Jul 23, 2020
59d2fa7
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 23, 2020
f15c036
Use channel to handle error
amaury1093 Jul 23, 2020
ff837e8
Use error chan
amaury1093 Jul 23, 2020
4080396
Update server/start.go
amaury1093 Jul 23, 2020
670c44e
Use %w
amaury1093 Jul 23, 2020
c5f850f
Merge branch 'aaronc/5921-grpc-proxy' of ssh://github.com/cosmos/cosm…
amaury1093 Jul 23, 2020
323e1c2
Add sdk.Context
amaury1093 Jul 24, 2020
8825e99
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 24, 2020
9058b6d
Add comments
amaury1093 Jul 24, 2020
0411977
Fix lint
amaury1093 Jul 24, 2020
0c90239
Add header and tests
amaury1093 Jul 24, 2020
8f15a05
Address comments
amaury1093 Jul 24, 2020
f533116
Factorize some code
amaury1093 Jul 24, 2020
72db921
Fix lint
amaury1093 Jul 24, 2020
d93803b
Add height and prove in req metadata
amaury1093 Jul 27, 2020
dbe951d
Merge branch 'master' into aaronc/5921-grpc-proxy
amaury1093 Jul 27, 2020
a37c83c
Add reflection test
amaury1093 Jul 27, 2020
8513b9d
Fix lint
amaury1093 Jul 27, 2020
4f659ce
Put grpc test in server/grpc
amaury1093 Jul 27, 2020
ae797c0
Update baseapp/grpcserver.go
alexanderbez Jul 27, 2020
180da4a
Update baseapp/grpcserver.go
alexanderbez Jul 27, 2020
b16cbd4
Remove proof header
amaury1093 Jul 27, 2020
692b032
Merge branch 'master' into aaronc/5921-grpc-proxy
alexanderbez Jul 27, 2020
d6afde0
Merge branch 'master' into aaronc/5921-grpc-proxy
sahith-narahari Jul 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
}

func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery {
ctx, err := app.createQueryContext(req)
ctx, err := app.createQueryContext(req.Height, req.Prove)
if err != nil {
return sdkerrors.QueryResult(err)
}
Expand All @@ -364,26 +364,28 @@ func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQu
return res
}

func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, error) {
// createQueryContext creates a new sdk.Context for a query, taking as args
// the block height and whether the query needs a proof or not.
func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, error) {
// when a client did not provide a query height, manually inject the latest
if req.Height == 0 {
req.Height = app.LastBlockHeight()
if height == 0 {
height = app.LastBlockHeight()
}

if req.Height <= 1 && req.Prove {
if height <= 1 && prove {
return sdk.Context{},
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
cacheMS, err := app.cms.CacheMultiStoreWithVersion(height)
if err != nil {
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
"failed to load state at height %d; %s (latest height: %d)", height, err, app.LastBlockHeight(),
)
}

Expand Down Expand Up @@ -517,7 +519,7 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) abci.
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1]))
}

ctx, err := app.createQueryContext(req)
ctx, err := app.createQueryContext(req.Height, req.Prove)
if err != nil {
return sdkerrors.QueryResult(err)
}
Expand Down
11 changes: 4 additions & 7 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (app *BaseApp) MountStores(keys ...sdk.StoreKey) {
}
}

// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
// multistore.
// MountKVStores mounts all IAVL or DB stores to the provided keys in the
// BaseApp multistore.
func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) {
for _, key := range keys {
if !app.fauxMerkleMode {
Expand All @@ -186,8 +186,8 @@ func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) {
}
}

// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
// multistore.
// MountTransientStores mounts all IAVL or DB stores to the provided keys in
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// MountTransientStores mounts all IAVL or DB stores to the provided keys in
// MountTransientStores mounts all the transient stores from the MultiStore.

// the BaseApp multistore.
func (app *BaseApp) MountTransientStores(keys map[string]*sdk.TransientStoreKey) {
for _, key := range keys {
app.MountStore(key, sdk.StoreTypeTransient)
Expand Down Expand Up @@ -297,9 +297,6 @@ func (app *BaseApp) Router() sdk.Router {
// QueryRouter returns the QueryRouter of a BaseApp.
func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter }

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// Seal seals a BaseApp. It prohibits any further modifications to a BaseApp.
func (app *BaseApp) Seal() { app.sealed = true }

Expand Down
12 changes: 12 additions & 0 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ var protoCodec = encoding.GetCodec(proto.Name)
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
anyUnpacker types.AnyUnpacker
serviceData []serviceData
}

// serviceData represents a gRPC service, along with its handler.
type serviceData struct {
serviceDesc *grpc.ServiceDesc
handler interface{}
}

var _ gogogrpc.Server
Expand Down Expand Up @@ -83,6 +90,11 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
}, nil
}
}

qrt.serviceData = append(qrt.serviceData, serviceData{
serviceDesc: sd,
handler: handler,
})
}

// AnyUnpacker returns the AnyUnpacker for the router
Expand Down
86 changes: 86 additions & 0 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package baseapp

import (
"context"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}

// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(servergrpc.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, err
}
}

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something for TODO?

sdkCtx, err := app.createQueryContext(height, false)
if err != nil {
return nil, err
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
}
md = metadata.Pairs(servergrpc.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
}

// Loop through all services and methods, add the interceptor, and register
// the service.
for _, data := range app.GRPCQueryRouter().serviceData {
desc := data.serviceDesc
newMethods := make([]grpc.MethodDesc, len(desc.Methods))

for i, method := range desc.Methods {
methodHandler := method.Handler
newMethods[i] = grpc.MethodDesc{
MethodName: method.MethodName,
Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) {
return methodHandler(srv, ctx, dec, interceptor)
},
}
}

newDesc := &grpc.ServiceDesc{
ServiceName: desc.ServiceName,
HandlerType: desc.HandlerType,
Methods: newMethods,
Streams: desc.Streams,
Metadata: desc.Metadata,
}

server.RegisterService(newDesc, data.handler)
}
}
18 changes: 18 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,23 @@ type APIConfig struct {
// Ref: https://github.com/cosmos/cosmos-sdk/issues/6420
}

// GRPCConfig defines configuration for the gRPC server.
type GRPCConfig struct {
// Enable defines if the gRPC server should be enabled.
Enable bool `mapstructure:"enable"`

// Address defines the API server to listen on
Address string `mapstructure:"address"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`

// Telemetry defines the application telemetry configuration
Telemetry telemetry.Config `mapstructure:"telemetry"`
API APIConfig `mapstructure:"api"`
GRPC GRPCConfig `mapstructure:"grpc"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -134,6 +144,10 @@ func DefaultConfig() *Config {
RPCReadTimeout: 10,
RPCMaxBodyBytes: 1000000,
},
GRPC: GRPCConfig{
aaronc marked this conversation as resolved.
Show resolved Hide resolved
Enable: false,
Address: "0.0.0.0:9090",
},
}
}

Expand Down Expand Up @@ -177,5 +191,9 @@ func GetConfig(v *viper.Viper) Config {
RPCMaxBodyBytes: v.GetUint("api.rpc-max-body-bytes"),
EnableUnsafeCORS: v.GetBool("api.enabled-unsafe-cors"),
},
GRPC: GRPCConfig{
Enable: v.GetBool("grpc.enable"),
Address: v.GetString("grpc.address"),
},
}
}
35 changes: 0 additions & 35 deletions server/constructors.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,15 @@
package server

import (
"encoding/json"
"io"
"os"
"path/filepath"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/server/api"
sdk "github.com/cosmos/cosmos-sdk/types"
)

type (
// AppOptions defines an interface that is passed into an application
// constructor, typically used to set BaseApp options that are either supplied
// via config file or through CLI arguments/flags. The underlying implementation
// is defined by the server package and is typically implemented via a Viper
// literal defined on the server Context. Note, casting Get calls may not yield
// the expected types and could result in type assertion errors. It is recommend
// to either use the cast package or perform manual conversion for safety.
AppOptions interface {
Get(string) interface{}
}

// Application defines an application interface that wraps abci.Application.
// The interface defines the necessary contracts to be implemented in order
// to fully bootstrap and start an application.
Application interface {
abci.Application

RegisterAPIRoutes(*api.Server)
}

// AppCreator is a function that allows us to lazily initialize an
// application using various configurations.
AppCreator func(log.Logger, dbm.DB, io.Writer, AppOptions) Application

// AppExporter is a function that dumps all app state to
// JSON-serializable structure and returns the current validator set.
AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string) (json.RawMessage, []tmtypes.GenesisValidator, *abci.ConsensusParams, error)
)

func openDB(rootDir string) (dbm.DB, error) {
dataDir := filepath.Join(rootDir, "data")
db, err := sdk.NewLevelDB("application", dataDir)
Expand Down
3 changes: 2 additions & 1 deletion server/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

Expand All @@ -23,7 +24,7 @@ const (
)

// ExportCmd dumps app state to JSON.
func ExportCmd(appExporter AppExporter, defaultNodeHome string) *cobra.Command {
func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Command {
cmd := &cobra.Command{
Use: "export",
Short: "Export state to JSON",
Expand Down
47 changes: 47 additions & 0 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package grpc

import (
"fmt"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/cosmos/cosmos-sdk/server/types"
)

const (
// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
reflection.Register(grpcSrv)
amaury1093 marked this conversation as resolved.
Show resolved Hide resolved

listener, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}

errCh := make(chan error)
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
}()

select {
case err := <-errCh:
return nil, err
case <-time.After(5 * time.Second): // assume server started successfully
return grpcSrv, nil
}
}
Loading